Concurrencia y paralelismo en Node.js

  ·   15 minutos

En este artículo aprenderás como usar Node.js para ejecutar procesos intensos y distribuir la carga usando Worker Threads, una funcionalidad de esta plataforma que te permite aprovechar el CPU de forma efectiva. Además, entenderás como funciona la API de Worker Threads sin librerías y así ganarás un conocimiento más profundo sobre Node.js.

Introducción

Node.js es una plataforma para ejecutar Javascript y se basa en un solo hilo, es decir, aunque las operaciones son asincrónicas, el código de los programas se ejecuta en un sólo hilo.

El procesamiento multi-hilo que realiza Node.js se centra en las operaciones de entrada/salida como peticiones de red y lectura de archivos. Esta orquestación se logra con el Event Loop y es transparente para el programador.

Por qué es importante

Si un programa realiza operaciones bloqueantes, se pueden crear cuellos de botella que impidan la ejecución de todas las operaciones. De aquí nace el mantra no bloquees el Event Loop.

En las aplicaciones modernas el procesamiento de datos es de vital importancia, con el uso de concurrencia y paralelismo se gana más control para lograr un performance óptimo.

Dentro de los casos más comunes están el procesamiento de archivos multimedia, validar la integridad de archivos y la ejecución de algoritmos.

Uso básico

Veamos primero como se puede lanzar un hilo que realiza la pesada operación de contar hasta el trillón, sin bloquear el hilo principal, utilizando worker threads.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
const { Worker, isMainThread, parentPort, workerData } = require('node:worker_threads');
const { performance } = require('perf_hooks');

if (isMainThread) {
  // This is the main thread

  // Use the same file as the worker
  // Pass some data to the worker
  const workerThread = new Worker(__filename, {
    workerData: {
      countTo: 1000000000 
    }
  });

  // Setup event listener to receive messages from the worker
  workerThread.on('message', (message) => {
    console.log(timeOfExecution(), 'Message from worker:', message);
  });

  console.log(timeOfExecution(), 'Main thread can still do other work');
} else {
  // This is the worker thread
  console.log(timeOfExecution(), 'Worker thread is running with', workerData);

  // Get the data passed to the worker
  const { countTo } = workerData;
  console.log(timeOfExecution(), 'Worker thread will count to:', countTo);

  // Do some heavy work
  for (let i = 0; i < countTo; i++) {}

  // Send a message to the main thread
  parentPort.postMessage('done');
}

function timeOfExecution() {
  const ms = performance.now().toFixed(2);
  return `${ms}ms`;
}

Al ejecutar este programa, obtendrás la siguiente salida por consola

44.54ms Main thread can still do other work
82.85ms Worker thread is running with { countTo: 1000000000 }
644.04ms Message from worker: done
85.06ms Worker thread will count to: 1000000000

Aquí ya podemos notar las características principales de los hilos trabajadores

  1. La plataforma se encarga de juntar la salida estándar de los hilos.
  2. El hilo principal queda libre durante durante la ejecución del hilo trabajador.
  3. Es posible pasar variables a los hilos trabajadores.

Patrón productor-consumidor

Un patrón muy común en las aplicaciones es tener un hilo produciendo datos y otro procesándolos. A este patrón le llamamos patrón productor-consumidor y es muy útil para separar responsabilidades y para realizar un procesamiento eficiente.

En este ejemplo tendremos un hilo que produce valores y otro hilo que los procesa más lentamente.

En el hilo principal, se configuran los trabajadores y se muestran los tiempos en los que se producen y consumen los valores.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
const { Worker } = require('node:worker_threads');
const { timeOfExecution } = require("./timeOfExecution");

// Launch producer and consumer workers
const producer = new Worker('./producer.js');
const consumer = new Worker('./consumer.js');

// Producer sends data to consumer
producer.on('message', (message) => {
  if (message.type === 'data') {
    console.log(timeOfExecution(), "Produced:", message.data);

    consumer.postMessage({
      type: 'data',
      data: message.data,
    });
  }

  if (message.type === 'end') {
    consumer.postMessage({
      type: 'end',
    });
  }
})

// Consumer receives data from producer
consumer.on('message', (message) => {
  console.log(timeOfExecution(), "Consumed:", message.data);
});

// Show time of exit of each worker
producer.on('exit', () => {
  console.log(timeOfExecution(), "Producer exited");
});

consumer.on('exit', () => {
  console.log(timeOfExecution(), "Consumer exited");
});

// Show time of exit of main process
process.on('exit', () => {
  console.log(timeOfExecution(), "Main process exited");
});

En el productor envía datos al hilo padre simulando un trabajo pesado que toma segundo y medio.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
const { parentPort } = require('worker_threads');

for (let i = 0; i < 5; i++) {
  // Simulate some heavy work that takes 1.5 seconds
  const now = Date.now();
  while (Date.now() - now < 1500) {}

  // Produce data to be consumed by consumer worker
  parentPort.postMessage({
    type: 'data',
    data: i.toString(),
  });
}

// Send end message to main thread
parentPort.postMessage({
  type: 'end',
});

Por último, el consumidor lee los datos a través del canal de mensajes y simula un trabajo de cómputo de un segundo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
const { parentPort } = require("worker_threads");

function expensiveComputation(n) {
  // Simulate some heavy work that takes 1 second
  const now = Date.now();
  while (Date.now() - now < 1000) {}
  return n;
}

async function main() {
  // Queue to store data
  let queue = [];
  let finished = false;

  // Consumer receives data from producer
  parentPort.on("message", (message) => {
    if (message.type === "data") {
      queue.push(message);
    }

    if (message.type === "end") {
      finished = true;
    }
  });

  // While loop to consume data
  while (true) {
    // If finished and queue is empty, break the loop
    if (finished && queue.length === 0) {
      break;
    }

    // If queue is empty, wait for 1 second before checking again
    if (queue.length === 0) {
      await new Promise((resolve) => setTimeout(resolve, 1000));
      continue;
    }

    // Consume data, one at a time
    const message = queue.shift();
    const result = expensiveComputation(message.data);
    parentPort.postMessage({ type: "data", data: result });
  }
}

// Start the consumer
// When the consumer is done, exit the process
main().then(() => {
  process.exit();
});

Si ejecutamos este programa, encontraremos que su tiempo de ejecución es considerablemente menor respecto de ejecutar el mismo algoritmo de forma sincrónica. Recuerda que este es un ejemplo que puedes adaptar a las necesidades de procesamiento de tu aplicación.

Patrón de canales de mensajería

En el ejemplo anterior vimos como es podemos separar la producción y consumo de datos, sin embargo, el programa principal tiene la complejidad de orquestar el paso de mensajes entre hilos.

Afortunadamente, Node.js implementa la funcionalidad de MessageChannel, el cual permite la comunicación bidireccional entre dos hilos. Este patrón puede simplificar significativamente la implementación del patrón productor-consumidor.

En el programa principal, crearemos un canal que nos dará dos puertos o puntos de comunicación, uno de ellos se le pasará al productor y el otro al consumidor. Esto resultará en que ambos hilos pueden comunicarse entre sí.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
const { Worker, MessageChannel } = require('node:worker_threads');
const { timeOfExecution } = require("./timeOfExecution");

const channel = new MessageChannel();

// Launch producer and consumer workers
const producer = new Worker('./producer.js', {
  workerData: { port: channel.port1 },
  transferList: [channel.port1]
});

const consumer = new Worker('./consumer.js', {
  workerData: { port: channel.port2 },
  transferList: [channel.port2]
});

// Consumer receives data from producer
consumer.on('message', (message) => {
  console.log(timeOfExecution(), "Consumed:", message.data);
});

// Show time of exit of each worker
producer.on('exit', () => {
  console.log(timeOfExecution(), "Producer exited");
});

consumer.on('exit', () => {
  console.log(timeOfExecution(), "Consumer exited");
});

// Show time of exit of main process
process.on('exit', () => {
  console.log(timeOfExecution(), "Main process exited");
});

Tanto en el productor como en el consumidor, se tomará el puerto de comunicación desde los datos que inicializan el hilo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
const { workerData } = require('worker_threads');

const { port } = workerData;

for (let i = 0; i < 5; i++) {
  // Simulate some heavy work that takes 1.5 seconds
  const now = Date.now();
  while (Date.now() - now < 1500) {}

  // Produce data to be consumed by consumer worker
  port.postMessage({
    type: 'data',
    data: i.toString(),
  });
}

// Send end message to the given port
port.postMessage({
  type: 'end',
});

Por último, desde el consumidor, reportamos los datos calculados de nuestra simulación de cómputo intensivo al puerto del hilo padre.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
const { parentPort, workerData } = require("worker_threads");
const { port } = workerData;

function expensiveComputation(n) {
  // Simulate some heavy work that takes 1 second
  const now = Date.now();
  while (Date.now() - now < 1000) {}
  return n;
}

async function main() {
  // Queue to store data
  let queue = [];
  let finished = false;

  // Consumer receives data from producer
  port.on("message", (message) => {
    if (message.type === "data") {
      queue.push(message);
    }

    if (message.type === "end") {
      finished = true;
    }
  });

  // While loop to consume data
  while (true) {
    // If finished and queue is empty, break the loop
    if (finished && queue.length === 0) {
      break;
    }

    // If queue is empty, wait for 1 second before checking again
    if (queue.length === 0) {
      await new Promise((resolve) => setTimeout(resolve, 1000));
      continue;
    }

    // Consume data, one at a time
    const message = queue.shift();
    const result = expensiveComputation(message.data);
    parentPort.postMessage({ type: "data", data: result });
  }
}

// Start the consumer
// When the consumer is done, exit the process
main().then(() => {
  process.exit();
});

Patrón de pool de trabajadores

Crear hilos cada vez que se va a realizar una tarea es costoso a nivel sistema operativo, por lo cual lo que suelen hacer las aplicaciones más eficientes es crear un conjunto de hilos y reutilizarlos.

En este ejemplo implementaremos el patrón de pool de trabajadores, el cual consiste de un grupo de hilos que están listos para procesar datos en paralelo. Para esto, refactorizaremos la implementación anterior para que haya 2 consumidores procesando datos al mismo tiempo.

Primero, crearemos una clase Pool la cual manejará los trabajadores y orquestará la comunicación en el hilo principal.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
const { Worker,} = require('node:worker_threads');

class Pool {
  constructor(workerPath, size, port) {
    this.workerPath = workerPath;
    this.size = size;
    this.workers = [];
    this.port = port;

    // Create workers
    for (let i = 0; i < size; i++) {
      this.workers.push(new Worker(workerPath));
    }

    // Listen for messages in the port
    // and distribute them to workers
    this.port.on('message', message => {
      if (message.type === 'data') {
        this.postMessage(message);
      }
      
      if (message.type === 'end') {
        this.broadcast(message);
      }
    });
  }

  postMessage(message) {
    // Round-robin distribution
    const worker = this.workers.shift();
    worker.postMessage(message);
    this.workers.push(worker);
  }

  on(event, callback) {
    // Register callback for all workers
    this.workers.forEach(worker => worker.on(event, callback));
  }

  broadcast(message) {
    // Broadcast message to all workers
    this.workers.forEach(worker => worker.postMessage(message));
  }

  terminate() {
    // Terminate all workers
    this.workers.forEach(worker => worker.terminate());
  }
}

module.exports = { Pool }

Ahora, reconfiguremos el consumer para que se comunique con el hilo principal, el cual está a cargo del pool, y no del programa principal.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
const { Pool } = require('./pool');
const { Worker, MessageChannel } = require('node:worker_threads');
const { timeOfExecution } = require("./timeOfExecution");

const channel = new MessageChannel();

// Launch producer and consumer workers
const producer = new Worker('./producer.js', {
  workerData: { port: channel.port1 },
  transferList: [channel.port1]
});

const consumerPool = new Pool('./consumer.js', 5, channel.port2);

// Consumer receives data from producer
consumerPool.on('message', (message) => {
  console.log(timeOfExecution(), "Consumed:", message.data);
});

Acceso a recursos compartidos

Los hilos, además de ejecutarse en paralelo, pueden compartir memoria dentro del mismo proceso. De hecho, esta es una de las ventajas de usar Worker Threads en lugar de Child Process.

Esta funcionalidad está disponible mediante la API de SharedArrayBuffer y Atomics. El primero nos permite reservar un espacio en la memoria, que será compartido entre distintos hijos trabajadores. El segundo habilita acceder de forma segura a ese espacio de memoria.

Veamos un ejemplo de un contador atómico.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Create a SharedArrayBuffer with a size in bytes
const sharedBuffer = new Int32Array(new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT));
sharedBuffer[0] = 0;

// Prints 0
console.log(Atomics.load(sharedBuffer, 0));

// Add 1 to the shared buffer
Atomics.add(sharedBuffer, 0, 1)

// Prints 1
console.log(Atomics.load(sharedBuffer, 0));

// Subtract 1 from the shared buffer
Atomics.sub(sharedBuffer, 0, 1)

// Prints 0
console.log(Atomics.load(sharedBuffer, 0));

En los ejemplos anteriores, el consumidor hace polling a intervalos de un segundo para tomar trabajos pendientes y completarlos. Esta implementación, aunque intuitiva, es menos eficiente.

Una forma de mejorar la implementación anterior, es compartir un espacio de memoria entre los hilos que cuente los trabajos pendientes. De esta manera, los hilos trabajadores se activarán apenas tengan una tarea que deban realizar.

Refactorizaremos el Pool para que envíe a los trabajadores el espacio de memoria compartida y un nuevo canal donde recibirán los mensajes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
const { Worker, MessageChannel } = require('node:worker_threads');

class Pool {
  constructor(workerPath, size, port) {
    this.workerPath = workerPath;
    this.size = size;
    this.workers = [];
    this.port = port;

    // Create workers
    for (let i = 0; i < size; i++) {
      const {
        port1: poolChannel,
        port2: jobChannel
      } = new MessageChannel();

      // Create shared buffer for worker
      const sharedBuffer = new Int32Array(new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT));

      // Initialize atomic counter
      sharedBuffer[0] = 0;

      // Create worker
      const worker = new Worker(workerPath);

      // Store worker, shared buffer and channel
      this.workers.push({
        worker,
        sharedBuffer,
        poolChannel,
        jobChannel,
      });

      // Send startup message to worker
      worker.postMessage({
        type: 'startup',
        payload: {
          sharedBuffer,
          jobChannel,
        }
      }, [jobChannel]);
    }

    
    // Listen for messages in the port
    // and distribute them to workers
    this.port.on('message', message => {
      if (message.type === 'data') {
        this.postMessage(message);
      }
      
      if (message.type === 'end') {
        this.broadcast(message);
      }
    });
  }

  postMessage(message) {
    // Round-robin distribution
    const worker = this.workers.shift();

    // Post message to worker
    worker.poolChannel.postMessage(message);

    // Increment atomic counter and notify worker
    Atomics.add(worker.sharedBuffer, 0, 1)
    Atomics.notify(worker.sharedBuffer, 0);

    // Add worker back to pool
    this.workers.push(worker);
  }

  on(event, callback) {
    // Register callback for all workers
    this.workers.forEach(worker => {
      worker.poolChannel.on(event, callback)
    });
  }

  broadcast(message) {
    // Broadcast message to all workers
    this.workers.forEach(worker => worker.poolChannel.postMessage(message));
  }

  terminate() {
    // Terminate all workers
    this.workers.forEach(worker => worker.worker.terminate());
  }
}

module.exports = { Pool }

Ahora, reimplementaremos el consumidor para que, en lugar de tomar trabajos pendientes a intervalos, simplemente espere a que el contador de trabajos pendientes sea distinto de cero.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
const { parentPort, receiveMessageOnPort } = require("worker_threads");

function expensiveComputation(n) {
  // Simulate some heavy work that takes 1 second
  const now = Date.now();
  while (Date.now() - now < 1000) {}
  return n;
}

parentPort.on("message", (message) => {
  if (message.type !== "startup") {
    return;
  }

  // First message received is the buffer
  const { jobChannel, sharedBuffer } = message.payload;

  // Start the consumer
  // When the consumer is done, exit the process
  main(sharedBuffer, jobChannel).then(() => {
    process.exit();
  });
});

async function main(sharedBuffer, jobChannel) {
  let finished = false;

  // While loop to consume data
  while (!finished || Atomics.wait(sharedBuffer, 0, 0) === "ok") {
    // Wait for the producer to produce data

    // Consume data
    let entry;
    while ((entry = receiveMessageOnPort(jobChannel)) !== undefined) {
      // Consume data, one at a time
      processMessage(entry.message);

      // Notify the producer that the data has been consumed
      Atomics.sub(sharedBuffer, 0, 1);
    }
  }

  function processMessage(message) {
    if (message.type === "data") {
      // Do the heavy computation and send the result back
      const result = expensiveComputation(message.data);
      jobChannel.postMessage({ type: "data", data: result });
    }

    if (message.type === "end") {
      // End of data reached
      finished = true;
    }
  }
}

El resto de las partes queda intacta, significando un refactor transparente para el productor y el programa principal.

Librerías de la comunidad

Implementar tus propios patrones sirve a tu conocimiento de ingeniería de software y programación. Según tus necesidades, puede ser conveniente usar librerías de la comunidad.

Una de las más recomendables es Piscina, la cual implementa el patrón pool de trabajadores con algunas características muy interesantes:

  • Permite reutilizar un mismo pool de trabajadores para distintas tareas.
  • Envía las tareas al primer trabajador disponible.
  • Canaliza los resultados de las tareas de forma automática.

Conclusiones

En este artículo aprendiste cómo hacer concurrencia y paralelismo en Javascript con Node.js. Ahora sabes como utilizar hilos trabajadores, canales de mensajería y operaciones atómicas.

Además, sabes como implementar patrones de programación concurrente en tus aplicaciones.

Te invito a pensar qué procesos en tu aplicación pueden apalancarse de esta funcionalidad para dar una mejor experiencia de usuario. Si tu aplicación realiza tareas grandes y complejas, sacarlas del hilo principal mejorará significativamente el performance.

Aunque lo visto en este artículo sea suficiente para mejorar el rendimiento en muchos casos, si tu proyecto depende en gran medida de procesamiento concurrente, te recomiendo aplicar lenguajes más especializados para este fin, como lo pueden ser Go o Java.