はじめに
この記事はenechain Advent Calendar 2024の4日目の記事になります。
こんにちは!enechainでソフトウェアエンジニアをしている古瀬です。
enechainでは今年10月にリリースした「eSquare Live」の開発を初期から担当しています。
「eSquare Live」は株式取引やFXのようにリアルタイムで電力商品を取引できるオンラインプラットフォームです。
今回は「eSquare Live」でもリアルタイム通知のために利用しているSocket.IO Adapterについて実装例を交えて紹介したいと思います。
Socket.IO Adapterとは
Socket.IOはリアルタイムな双方向通信システムにおいて非常に重要かつ強力な機能を持ったOSSです。
その特徴としては以下のようなものが挙げられます。
- 柔軟な通信プロトコル: WebSocket非対応環境やネットワークが不安定な環境下でHTTP Pollingを選択できる
- 自動切断管理: コネクション断後に自動で再接続する
- イベント駆動モデル:
emit
やon
でイベントとしてメッセージを処理する
「eSquare Live」でも注文イベントなどをリアルタイムでユーザーに通知するため利用しており、可用性向上のためSocket.IOサーバーを複数台稼働させています。
Socket.IOを複数台構成で運用する場合、各サーバー間でクライアント情報やイベントデータを共有する必要があります。この課題を解決するために用いられるのがSocket.IO Adapterです。
デフォルトのAdapterはin-memoryでシングルサーバーでのみ動作しますが、カスタムAdapterを利用することでサーバー間通信が可能になります。
一例としてSocket.IO公式のカスタムAdapterであるRedis Adapterは下図のように連携してサーバー間で情報を共有しています。
この例ではRedisを介してサーバー間で通信していますが、それ以外にもMongoDBやPostgreSQL、3大クラウドのメッセージキューをベースにしたAdapterも公開されています。
接続状態の回復
ここで一度Adapterから離れてクライアント-サーバー間のコネクションが瞬断した場合を考えます。
Socket.IOサーバーは接続状態の回復をサポートしており、以下のオプション指定で設定が有効になります。
const io = new Server(httpServer, { connectionStateRecovery: {} // キーが存在していればOK });
この設定を有効にすることで意図しない瞬断が発生しても接続IDやセッションデータを引き継いで再接続(=接続を回復)できます。
express
とsocket.io
を使った最小構成のSocket.IOサーバーを立てて実験してみましょう。
- server.js
起動方法
npm install express socket.io node server.js
const express = require("express"); const { createServer } = require("http"); const { Server } = require("socket.io"); const server = createServer(express()); const io = new Server(server, { connectionStateRecovery: {}, }); io.on("connection", (socket) => { // 何回目の接続かをカウントする socket.data.count = (socket.data.count || 0) + 1; socket.emit("message", `connected count: ${socket.data.count}`); }); server.listen(3000, () => console.log("Server is running on port 3000"));
※ 最低でも1度サーバーからクライアントへ向けてメッセージを送信しないとセッション情報は保持されません
- client.html
<!DOCTYPE html> <html lang="ja"> <body> <p id="text"></p> <script src="https://cdn.socket.io/4.7.2/socket.io.min.js"></script> <script> const socket = io('http://localhost:3000', { transports: ['websocket'] }); // 2秒ごとに瞬断を再現 // see: https://socket.io/docs/v4/connection-state-recovery#usage socket.on('connect', () => { setTimeout(() => { if (socket.io.engine) { socket.io.engine.close(); } }, 2_000); }) // p#textにサーバーから受信したメッセージを表示する socket.on('message', (msg) => { document.getElementById('text').textContent = msg; }); </script> </body> </html>
これらのコードを実行すると2秒毎にコネクションが切断→再接続され、再接続のたびにカウンターがインクリメントされる動作を確認できます。
(connectionStateRecoveryを指定せず設定を無効にするとこの挙動は再現されません)
これはセッションデータをsocket.data
に保持しており、クライアントの再接続時に前回のセッションを復元することでカウンターのインクリメントを実現しています。
IDやroom(コネクションのグループのようなもの)の情報も引き継がれるため、前回のイベントが中断してしまった場合でも中断した時点からイベント処理を再開させることができます。
イベントやセッションデータの持ち方など、さらに詳しく仕組みを知りたい方は公式ドキュメントのConnection state recoveryも参照してみてください。
Adapterの選択
ここまでAdapterと接続状態の回復についてみてきましたが、
- 複数台のサーバー構成
- クライアント-サーバー間の瞬断耐性
- サーバー-Middleware間の瞬断耐性
に対応できるAdapterの選択肢はあまり多くなく、
のどちらかを選択することになります。
「eSquare Live」では比較的高速かつ導入が容易なRedis Streams Adapterを採用することになりました。
(参考) Redis AdapterとRedis Streams Adapterの比較
Socket.IOのAdapterにはRedis Adapterというものも存在します。
Redis AdapterはRedisのPub/Subの機能を利用するのに対して、Redis Streams AdapterはStreamsを利用します。
Pub/Sub | Streams | |
---|---|---|
データモデル | シンプルなメッセージング | 追記型データストリーム |
永続化 | ✕ | ◯ |
メッセージの再送 | ✕ | ◯ |
メッセージ順序の保証 | ✕ | ◯ |
メッセージの保持 | ✕ | ◯ |
監視 | ✕ | ◯(ACK情報を監視可能) |
遅延の許容 | 低遅延 | ACKやデータ保持のためやや遅延あり |
上表の通り、Redis Adapterが利用するPub/Subはat most once配信でクライアントが瞬断してしまった場合にデータが消失します。
また、Socket.IO-Redis間のコネクションの瞬断でもデータ消失が発生するため信頼性を担保することが難しい構成になっています。
Redis Streams Adapterの実践導入
AdapterをSocket.IOサーバーに接続するためには以下の3ステップが必要です。
- 必要なパッケージをインストール
- Adapterのインスタンスを生成
- ServerインスタンスにAdapterを設定
この3ステップをもとに先程のserver.jsを書き換えてみます
- server.js
install
npm install @socket.io/redis-streams-adapter redis
const express = require("express"); const { createServer } = require("http"); const { Server } = require("socket.io"); // 1. インストールしたパッケージ const { createClient } = require("redis"); const { createAdapter } = require("@socket.io/redis-streams-adapter"); // 2. Adapterのインスタンスを生成 const redisClient = createClient({ url: "redis://localhost:6379" }); const redisStreamsAdapter = createAdapter(redisClient); const server = createServer(express()); const io = new Server(server, { connectionStateRecovery: {}, // 3. ServerインスタンスにAdapterを設定 adapter: redisStreamsAdapter, }); io.on("connection", (socket) => { // 何回目の接続かをカウントする socket.data.count = (socket.data.count || 0) + 1; socket.emit("message", `connected count: ${socket.data.count}`); }); server.listen(3000, () => console.log("Server is running on port 3000"));
※ redisサーバーが必要
実装はこの通り簡単なのですが、落とし穴としてインフラの設定でSticky-sessionを必ず有効にする必要があります。
「eSquare Live」開発当初もこの設定が抜けており、セッション情報がうまく引き継げませんでした。
Sticky-sessionはクライアントの情報を元に、どのサーバーにルーティングするかを制御する仕組みです。
IPアドレスとCookieを選択できますが、Socket.IOの瞬断対策としてはIPアドレスはインターネット接続ごとに変更される可能性があるためCookieで制御することが望ましいです。
弊社ではKubernetes(GKE)を利用しているため、この制御はBackendConfigのsessionAffinityで行っています。
apiVersion: cloud.google.com/v1 kind: BackendConfig metadata: name: socketio-bc spec: sessionAffinity: affinityType: "GENERATED_COOKIE" affinityCookieTtlSec: 10800
NginxやApache、クラウドサービスのLB単位でも設定が可能なので、ご利用のインフラ構成に応じてSticky-session、Session Affinityを設定してください。
まとめ
リアルタイムな双方向通信を実現するSocket.IOは、その柔軟性と機能性から、多くのアプリケーションで利用されています。
しかし、可用性を高めた構成を実現するためには、次のようなポイントを押さえる必要があります。
- Connection state recoveryに対応するAdapterの選択
connectionStateRecovery
の設定- Sticky-sessionの有効化
Socket.IOは、手軽な導入でリアルタイム通信を実現できますが、少しの工夫でさらに強力なシステムを構築できます。
今回の内容が、リアルタイム通信の信頼性向上に役立てば幸いです。
enechainでは、事業拡大のために共に技術力で道を切り拓いていく仲間を募集しています。