kakts-log

programming について調べたことを整理していきます

worker_threadsを使ったNode.js マルチスレッドプログラミング

qiita.com
この記事は Qiita Node.js Advent Calendar 2018 14日目の記事となります。

今回は Node.js のWorker Threadsに関してまとめます。

本記事では、前提としてNode.js v10.14.1 における内容をまとめています。
worker_threadはexperimentalな機能なので、今後仕様が変わりうるので注意が必要です。
Worker Threads | Node.js v11.4.0 Documentation

worker_threadsとは

Node.js 10.5.0より、Node.jsでのスレッドプログラミングを可能にするworker_threadsモジュールが追加されました。
現時点Node.js10.14.1 では、 Node.jsのプログラム実行時に --experimental-workerフラグを指定することで worker_threadsモジュールを利用可能です。

worker_threadsを取り入れたPR や、contributorによるFAQを読んでみると、このモジュールが取り入れられた背景を知ることができます。 github.com

workers-faq.md · GitHub

worker_threadを使う際の注意点として、 I/Oバウンドな処理でなく、CPUバウンドな処理に用いると効率的になります。

また、Workerスレッド間では、child processやclusterモジュールと違って、ArrayBufferインスタンスの転送や、SharedArrayBufferインスタンスの共有によって、メモリ空間を効率的に共有できます。

サンプルコード

Node.js のドキュメントに記載されているサンプルコードを抜粋すると以下となります。

const {
  Worker, isMainThread, parentPort, workerData
} = require('worker_threads');

if (isMainThread) {
  module.exports = async function parseJSAsync(script) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, {
        workerData: script
      });
      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0)
          reject(new Error(`Worker stopped with exit code ${code}`));
      });
    });
  };
} else {
  const { parse } = require('some-js-parsing-library');
  const script = workerData;
  parentPort.postMessage(parse(script));
}

mainスレッド、workerスレッドで行う処理は、 isMainThreadがtrueかどうかで判定して分岐させています。
もちろん、Workerインスタンスを生成する際の引数で別のファイルを指定することで、mainスレッド、Workerスレッドの処理を別ファイルで分けてかけます。

ここではmainスレッドでの実行時に Promise内でWorkerインスタンスを生成し、 Workerスレッド側で parentPort.postMessageによってmainスレッドにメッセージを送信しています。

続いて、worker_threadsを理解する上で重要なWorker, MessagePortクラスについて説明します。

worker_thread モジュールについて詳しく

ここでは worker_threadを使う際の各機能の解説をします。
詳細を知りたい方は公式ドキュメントを参照ください。
Worker Threads | Node.js v11.4.0 Documentation

worker.isMainThread

実行中のプログラムがmainスレッドで動作している場合はtrue, workerスレッド内で実行されている場合はfalseとなります。
前述したように、main, workerで処理を分けたい場合はこれで判定します。

worker.parentPort

parentPortはMessagePortクラスのインスタンスで、mainスレッドとコミュニケーションを可能にします。 parentPort.postMessage() を使用して送信されたメッセージは mainスレッド環境でworker.on('message')イベントのハンドラ内で使うことができます。

MessagePortクラスについては後ほど説明します。

worker.threadId

現在のスレッドを識別するための整数idです。
mainスレッドでは0 workerスレッドでは生成した順に1から増えていきます。

https://github.com/nodejs/node/blob/v10.14.1/src/node_worker.cc#L35

uint64_t next_thread_id = 1;

https://github.com/nodejs/node/blob/v10.14.1/src/node_worker.cc#L70-L74

  // Generate a new thread id.
  {
    Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex);
    thread_id_ = next_thread_id++;
  }

ちなみに、worker.isMainThreadは、 このthreadIdが0かどうかで判定しています。
https://github.com/nodejs/node/blob/v10.14.1/lib/internal/worker.js#L31

const isMainThread = threadId === 0;

worker.workerData

Workerインスタンスの生成時にコンストラクタに渡されたデータを、workerスレッドでの実行時に使うことができます。

Workerインスタンス生成時にここでセットされ https://github.com/nodejs/node/blob/v10.14.1/lib/internal/worker.js#L311

workerスレッドでの実行時に workerDataとして使える様になっています。
node/worker.js at v10.14.1 · nodejs/node · GitHub

ここで、上記のプロパティをつかって簡単なプログラムを書いてみます。

const {Worker, isMainThread, workerData, parentPort, threadId} = require('worker_threads');

if (isMainThread) {
  // mainスレッド
  console.log(`Main Thread!!! isMainThread: ${isMainThread}, threadId: ${threadId}`);

  // 4つ作成する
  for (let i = 0; i < 4; i++) {
    const worker = new Worker(__filename, { workerData: `hello: ${i}` });
    worker.on('message', (message) => {
      console.log(`[main] message got from worker : ${message}`);
    });

    worker.postMessage("from tokyo")

  }
} else {
  // workerスレッド
  console.log(`[worker] workerData: ${workerData} isMainThread: ${isMainThread}`)
  parentPort.postMessage(`ThreadId: ${threadId}`)
}

実行結果は以下のとおりです。

$ node --experimental-worker app.js
Main Thread!!! isMainThread: true, threadId: 0
[worker] workerData: hello: 0 isMainThread: false
[main] message got from worker : ThreadId: 1
[worker] workerData: hello: 1 isMainThread: false
[main] message got from worker : ThreadId: 2
[worker] workerData: hello: 2 isMainThread: false
[main] message got from worker : ThreadId: 3
[worker] workerData: hello: 3 isMainThread: false
[main] message got from worker : ThreadId: 4

Workerクラス

Worker クラスは 独立したJavascript実行スレッドを表現するものです。 worker_threadモジュールにおいておそらく一番重要なので、詳しくまとめます。

多くのNode.jsAPiはこの実行スレッド内で利用可能です

Worker 環境内での注目すべき違いは以下のとおりです。

- process.stdin, process.stdout, process.stderr はmainスレッドにリダイレクトされる  
- require('worker_threads').isMainThread プロパティはfalse  
- require('worker_threads').parentPort メッセージポートは利用可能  
- process.exit() はプログラム全体を停止せず、ただ単一のスレッドのみ停止します  
- process.abort()は利用不可
- process.chdire()やファイルグループやユーザを設定するprocess系のメソッドは利用不可  
- process.env は環境変数に対して読み込み専用  
- process.title は変更不能  
- シグナルは process.on()経由では転送されない  
- worker.terminate()が読み出された時点で 実行が停止する  
- 親プロセスからのIPC channelはアクセス不能
- trace_eventsモジュールはサポートされない

現時点で 下記の項目関しては対応待ちとなっているようです。

- inspector モジュールの使用
- Nativeアドオンの使用

なお、Workerスレッド内で workerインスタンスを作成することは可能です。

Workerクラスの内部実装

WorkerクラスについてのNode.jsの実装を見ていきます。
Workerクラスは、EventEmitterを継承しています。
https://github.com/nodejs/node/blob/v10.14.1/lib/internal/worker.js#L252

スレッド間でのメッセージ受信や、スレッド終了のタイミングでイベントを発火します。
Workerクラスのコンストラクタでインスタンス生成処理の最後でスレッドを開始します。 https://github.com/nodejs/node/blob/v10.14.1/lib/internal/worker.js#L316

new Workerによってインスタンスが生成されるタイミングで、Workerスレッドが始動します。
node/src/node_worker.ccの方で、livuvのuv_thread_createをつかってスレッド開始しているのがわかります。

https://github.com/nodejs/node/blob/v10.14.1/src/node_worker.cc#L417-L419

void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
  Worker* w;
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
  Mutex::ScopedLock lock(w->mutex_);

  w->env()->add_sub_worker_context(w);
  w->stopped_ = false;
  w->thread_joined_ = false;

  w->thread_exit_async_.reset(new uv_async_t);
  w->thread_exit_async_->data = w;
  CHECK_EQ(uv_async_init(w->env()->event_loop(),
                         w->thread_exit_async_.get(),
                         [](uv_async_t* handle) {
    static_cast<Worker*>(handle->data)->OnThreadStopped();
  }), 0);

  CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) {
    static_cast<Worker*>(arg)->Run();
  }, static_cast<void*>(w)), 0);
}

Node.jsでのプログラム実行時に、Workerコンテキストの場合は 下記の場所でWorkerスレッドに関するセットアップが行われます。

https://github.com/nodejs/node/blob/v10.14.1/lib/internal/bootstrap/node.js#L220-L223
https://github.com/nodejs/node/blob/v10.14.1/lib/internal/worker.js#L438-L511

さらにworker_threadの挙動について知るためには、 このlivuvのuv_thread_createについて調べる必要がありますが、今回は説明しません。

MessageChannelクラス

MessageChannelクラスのインスタンスは 非同期の2-wayコミュニケーションチャネルを提供します。 javascriptのMessageChannelクラスを踏襲しているようです。

developer.mozilla.org

MessageChannelのクラス自体はメッセージを持っておらず、 new MessageChannel()生成したMessageChannelインスタンスは、port1 port2プロパティを持っています。
それぞれがMessagePortクラスのインスタンスであり、双方向にやりとりできます

const { MessageChannel } = require('worker_threads');

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });

MessagePortクラス

MessagePortクラスのインスタンスは非同期の2wayコミュニケーションチャンネルの片方のチャンネルを表現するものとなります。

Workerクラスと同様にEventEmitterを継承しています。
https://github.com/nodejs/node/blob/v10.14.1/lib/internal/worker.js#L69-L71

// Set up the new inheritance chain.
Object.setPrototypeOf(MessagePort, EventEmitter);
Object.setPrototypeOf(MessagePort.prototype, EventEmitter.prototype);

前述したコードでも書いているように、MessagePort#postMessage()によって、スレッド間でデータをやり取りできます。
SharedArrayBufferを使えば、main worker間でcloneせずにメモリを共有してやり取りできます。

worker_threadで大事なものに関しては以上のとおりです。 実際にプログラムを書きつつ調べると理解が深まるかと思います。

Workerスレッドでcpuバウンドな処理をさせてみる。

一通り説明できましたので、ここでmainスレッドと\workerスレッドでプログラムを分けた上で、5つのWorkerスレッドで重いループ処理を行うプログラムを書きます。

main workerで行う処理をざっくり説明すると以下のとおりです。
- mainスレッド: 5 つのWorkerスレッドを起動し、それぞれのイベントハンドラを設定する
- workerスレッド: 2重ループの重い処理を行う 終了後mainスレッドにメッセージを投げる

まず、mainスレッド用の実装を main_thread.jsに記述します。
main_thread.js

const {isMainThread, Worker} = require('worker_threads');

function createWorker(path, wd) {
  const w = new Worker(path, {workerData: wd});

  w.on('error', (err) => {
    console.error(`Worker ${w.workerData} error`)
    console.error(err);
  });

  w.on('exit', (exitCode) => {
    console.log(`exitted! : ${wd}`);
  });

  w.on('message', (msg) => {
    // workerスレッドから送信されたメッセージ
    console.log(`[Main] Message got from worker ${msg}`)
  });
  return w;
}

console.log(`[Main] isMainThread: ${isMainThread}`);
for (let i = 0; i < 5; i++) {
  const worker = createWorker('./heavy_thread.js', i);
}

ここでは worker_threadsモジュールをrequireしてisMainThreadとWorkerを読み込んでます。

今回は Workerスレッドの生成処理を createWorker関数にまとめていて、内部でnew Workerしてインスタンスを生成します。
Workerスレッドで行う処理は heavy_thread.js というファイルにまとめていて、new Workerの第1引数にパスを指定します。

heavy_thread.jsの実装は下記のとおりです。
heavy_thread.js

const {parentPort, workerData, isMainThread, threadId} = require('worker_threads');


console.log(`[Worker] isMainThread: ${isMainThread}, workerData: ${workerData}, threadId: ${threadId}`);
let count = 0;
for (let i = 0; i < 10000; i++) {
  for (let j = 0; j < 10000; j++) {
    count++;
    count /= 3
    count = count * count * count
  }
}

parentPort.postMessage(`[Worker] Finished ${workerData}`)

これを実行すると以下のようになります
前述したように、実行時に--experimental-workerフラグを付ける必要があります

$ node --experimental-worker parent_thread.js
[Main] isMainThread: true
[Worker] isMainThread: false, workerData: 0, threadId: 1
[Worker] isMainThread: false, workerData: 1, threadId: 2
[Worker] isMainThread: false, workerData: 2, threadId: 3
[Worker] isMainThread: false, workerData: 3, threadId: 4
[Worker] isMainThread: false, workerData: 4, threadId: 5
[Main] Message got from worker [Worker] Finished 0
[Main] Message got from worker [Worker] Finished 1
exitted! : 0
exitted! : 1
[Main] Message got from worker [Worker] Finished 2
exitted! : 2
[Main] Message got from worker [Worker] Finished 3
exitted! : 3
[Main] Message got from worker [Worker] Finished 4
exitted! : 4

これでmain・workerスレッド間でメッセージをやりとりできました。

worker_threadを使わない場合との速度比較

次に、先程書いたworkerスレッドを使った処理と、 単一ファイルでworkerスレッドを使わずシングルスレッドで処理を行った際の 速度について調べます。
厳密に同じ処理ではないので正しく計測はできていないですが。 シングルスレッドでの実行プログラムは以下のように書きました。
10000 * 10000のループ処理を5回行うプログラムです。

single.js

console.log('[Single] start')
// cpuバウンドの重い処理
let count = 0;
for (let k = 0; k < 5; k++) {
  for (let i = 0; i < 10000; i++) {
    for (let j = 0; j < 10000; j++) {
      count++;
      count /= 3
      count = count * count * count
    }
  }
}
console.log('[Single] finish')

上述のsingle.jsと、 --experimental-workerフラグを付けて実行したmain_thread.jsの実行時間を比較します。

シングルスレッド・workerを使ったマルチスレッドでともに10回ずつ実行した際の平均実行時間は以下のとおりです 実行時にtimeコマンドを使って算出したものです。

real(s) user(s) sys(s)
シングルスレッド(single.js) 5.4545 5.4207 0.0289
マルチスレッド 1.294 5.8129 0.1398

結果を見てみると、workerを使ったマルチスレッド環境では 完了時間が 約1/4となり、かなり実行時間を短縮できたことがわかりました。

補足ですが、worker_threadを使った実行において、 real < user となっていて気になったので調べたのですが、 この場合、マルチコア・マルチスレッドでの並列実行の恩恵をうまく受けていること示唆する結果とのことでした。

参考 unix.stackexchange.com

The rule of thumb is:
real < user: The process is CPU bound and takes advantage of parallel execution on multiple cores/CPUs.
real ≈ user: The process is CPU bound and takes no advantage of parallel exeuction.
real > user: The process is I/O bound. Execution on multiple cores would be of little to no advantage.

さいごに

worker_threadモジュールによって、Node.jsでのマルチスレッドプログラミングが可能になり、cpuバウンドな処理をさせたいときには活用したいと思いました。

多少仕組みを学ぶのに労力がかかったのですが、worker_threadのラッパーであるmicrojobというモジュールも人気なので、こちらを使うと楽に書けるのではと思います。 github.com