がるの健忘録

エンジニアでゲーマーで講師で占い師なおいちゃんのブログです。

LaravelのMessage Queueを把握してみる:前提としてのMessage Queueそのもの

ちょいと「LaravelのMessage Queue」について調べる用事があったので。
調べ物の備忘録用に書いているので、間違いとかあるかもしれないので
・使う時は気をつけてください
・間違いに気付いたらコメントなどで突っ込んでいただければ、と思います。
という言い訳を書いておく(笑

さて。

まず「Message Queue」ってのが、これは「システムの一般的な概念」として存在します。
大昔は、messnd() / msgrcv() とか mq_send() / mq_receive() とかを使ってましたねぇ。プロセス間通信と呼ばれるものの一つで、まぁそのまんま「メッセージキュー」って呼称だったと記憶しております。
messnd() とか mq_send() とかでenqueue、msgrcv()とかmq_receive()でdequeueする、基本的には「キュー構造を持つデータ構造」ってだけです。「キュー構造はFIFOって言ってね」って下りは、省略してよいよね?
なので本質的には「なにがしか(通常は文字列)をenqueueで積んで、(だいたい)積んだ順番にdequeueで取り出す」機能です。

でまぁ、これを「何に使ってもよい」のですが、「積む速度にムラがあったりするようなタスクが、時々"ドカっと"積まれたりするのを、ちまちまと処理していきたい」なんて時に、よく使われまして。
Message Queueのよいのが「積むのはどんなにムラのある積み方をしても、処理は基本的に"一定のペースで片付ける"」って所にあります。なので、インフラリソースがある程度計算しやすいんですな。
この「お願いする」と「実際に処理する」が別々に動くのを「非同期」なんて言い方をしたりもしますねぇ、なのでMessage Queueは「非同期」でございます。
なお、積む「お仕事」は、いわゆる文字列(string)であればフォーマットは「お好み」なので、適宜好きな電文フォーマットをご用意ください。わかりやすくパースしやすいドメイン固有言語(DSL)をざっくり設計するか、PHPとかだと「配列をjsonなりserializeなり」ってのも手かなぁ、とは思います。
積むお仕事の事をタスクとかjobとか言っていたりする事もあるんで、job queueなんて呼称も耳にしますねぇ。task queueって呼称もありますが、job queueよりは聞かない……かもしれない。

んで。

「非同期でお仕事を処理する」って文脈でMessage Queueを語る場合。
「積み上げる先はどこ?」「積んだお仕事を誰がどんな風に処理して消化していくの?」ってあたりが重要になります。
ので、かみ砕いて。

まず「積み上げる先は?」ってのがあって。
例えば先にあげた「messnd() とか mq_send()」だと、これは「OSで実装されているプロセス間通信(IPC)」で、まぁ「ミドルウェアとかあんまり期待できなかった頃」には高頻度で使われていたものでございます*1。
でまぁ一気に時間軸を「現在」に持ち上げると……いやまぁ「RDBで自力実装」とかもありなのですが、AWS SQSとか、Apache Kafkaとか、ようは「高圧でデータをため込んで、それなりに障害耐性があって、そこそこの速度で"ある程度選択して"データを読み出せる」機構であれば、ため込む先としては十分でございます……のであとは要件に合わせて。

お次に「どうやって仕事を消化するの?」については、それこそまぁ「バッチでも組め」ってお話になるので。
基本的には「メッセージがある間は、グルグルとぶん回して処理をし続けるバッチを組む」ってのが基本になります。

積み上げるお仕事の依頼文フォーマットについてはまぁ「ご随意に」。
わかりやすく書きやすく後でメンテナンスしやすい……なんてことを考えておくとよろしいかと。

んでまぁそうすると次に考察しないといけないのが以下の事象です。
・お仕事消化側、延々と、プロセスが24時間365日動いているとして、メモリリークとか大丈夫?
・同じくお仕事消化側、「お仕事がない」時って、バッチはどうしてるの?

まずメモリリークについては、おいちゃんが普段PHPを使っているので、その場合端的に「そこまでの信頼性はない」と考えているので(身も蓋もない)。
そうするとまぁ、仕組みとしては「一定時間、または一定回数働いたら、一度そのバッチプロセスを落とす」って実装を、よくやります。
つまり

while($message = dequeue()) {
    お仕事
}

ではなくて

while($message = dequeue()) {
    // 回数確認
    if (一定回数 <= ++$作業カウンタ) {
        終了処理してプロセスをexit;
    }
    // else
    お仕事
}

ってな感じにしておきます(回数制限の場合)。
そうなると「死んだら次のバッチを誰が起こすの?」ってのがあるんですが、ちょいと棚の上にpush。

お次。
「「お仕事がない」時って、バッチはどうしてるの?」については、2種類の見解が、一応ありまして。
・お仕事がない時は、sleepとかで待つ
・お仕事がない時は、一端exitで終了する
となります(レアケース的に別ラインも存在しますが、面倒なんで省略)。

でまぁ、おいちゃんの場合は「sleepで待ってもメモリとか食うしなぁ」となるので、基本「落ちる/落とす」事が多いです。

……となると、お仕事がないと落ちるので。つまりは「お仕事をやりすぎても」「お仕事がなくても」落ちることになります。
なので、メモリリークで棚に上げていた「死んだら次のバッチを誰が起こすの?」ってお題をpopさせて、話をmergeしていきます。

お題は「お仕事消化バッチのプロセスをいかにして(再度)立ち上げるか」。

個人的にはここは「cronで毎分起動 + セマフォで多重制御」1択かなぁ、と。
雑ではありますが、色々と楽ですしねぇ。
やり方は簡単で、以下の通りです。

・お仕事消化バッチは、毎分cronで起動される(1つかもしれないし、n個同時に起動かもしれないし、その辺は多重度の設定に合わせてお好みで)
・お仕事消化バッチはセマフォで多重起動が管理されているので、毎分動かしても「余ったプロセスは即時終了」となる
 → なので、セマフォの設定は「待つ」じゃなくて「すぐに落ちる」にする

ってくらいかなぁ、と。
そうするとまぁ、cron設定するくらいでコントロールが出来るので、手間がなくて楽でございます。
cronで動かすのを「1つのバッチ」とかにして、そこで「何多重同時に起動するか」を設定したり書いておいたりすると、「ソースコード管理の範囲で全部賄える」ですしおすし。

なお次に出てくるのが
・コード修正したの、即時に反映したいんだけど、どうすりゃいいの?
ってのがあって(バグとか)。

まぁ真面目に考えると「シグナルとか実装するかね? https://www.php.net/manual/ja/function.pcntl-signal.php 」ってなるのですが、PHPで「そこまでするかね?」ってのが、なくもないので(必要ならやるけどおいちゃんは)(「PHPでシグナル」は、後でBlogに書いておきましょうかねぇ)。
もうちょっとざっくりやるときは
・graceful の機能を作っておく
 → 大体「ファイルを置いておいて、そのファイルが存在するときは処理のloopの最後でexitする」くらいのざっくり実装が多いです

ってのを前庭に

・新しいコードをデプロイ
・gracefulファイルをscpとかrsyncとかで置く
・ちょいと待つ(1分もあれば、全てのプロセスがgraceful踏んで一度exitするでしょ。真面目にやるんならpsでプロセス起動時刻を確認)
・gracefulファイルをsshとかrsyncとかで削除

ってやると、まぁ、片付きます。
バッチサーバが何十何百もあると面倒なのですが、2~3台まで、くらいならこんな手順(の半自動化)くらいでも、結構いけるのではないかなぁ、と。
(大体自動化できるんで、台数増えてもぶっちゃけそんなに困りません)

あと、スケールアウトで「バッチサーバが複数台」ある時とかは。
このやり方は基本「非同期」なので、例えば外との通信の都合があって「全体として、ある瞬間に10処理までに押さえたい」場合、「サーバ1台辺りの多重起動数」を「10 / サーバ台数」ってしておけば、普通にコントロール可能です。
多分「このサーバはaとbのバッチ、このサーバはcとdのバッチ」ってやるよりも「全体的に薄く手を出す」ようにしたほうが、メンテとかが楽です。1台落ちても「性能がうっすら落ちる」だけで、甚大な被害とかが出なくなりますし。

……なんか他にも色々あった気がするのですが、今回の本題は「Laravelでの実装確認」なので、前説はこれくらいで。

*1:が、PHPにも https://www.php.net/manual/ja/function.msg-send.php とかあるし、「もう使われていないのか?」と問われると、「ど~なんだろうねぇ?」とか、なんとか