Tech Racho エンジニアの「?」を「!」に。
  • Ruby / Rails関連

Solid Queue README -- DBベースのActive Jobバックエンド(翻訳)

概要

MITライセンスに基づいて翻訳・公開いたします。

日本語タイトルは内容に即したものにしました。

  • 2024/09/27: 更新

basecamp/solid_queue - GitHub

Solid Queue README -- DBベースのActive Jobバックエンド(翻訳)

Solid Queueは、Active Jobで利用できるDBベースのキューバックエンドであり、シンプルさとパフォーマンスを念頭に置いて設計されています。

通常のジョブエンキューや処理に加えて、ジョブの延期、コンカレンシー制御、キューの一時停止、数値によるジョブ単位の優先度指定、キュー順序に基づいた優先度指定、バルクエンキュー(Active Jobのperform_all_laterで使われるenqueue_all)もサポートしています。

Solid Queueは、MySQL、PostgreSQL、SQLiteなどのSQLデータベースで利用可能で、FOR UPDATE SKIP LOCKED句でジョブポーリング時のブロッキングやロック待機を回避します(利用可能な場合)。Solid Queueのジョブリトライ、ジョブ破棄、エラー処理、シリアライズ、ジョブの延期はActive Jobに依存しており、Ruby on Railsのマルチスレッドと互換性があります。

🔗 インストール方法

Rails 8の新規アプリケーションでは、デフォルトでSolid Queueが設定されます。ただし、それより前のバージョンのRailsを実行している場合は、以下の手順に沿うことで手動で追加できます。

  1. bundle add solid_queue
  2. bin/rails solid_queue:install

上のコマンドを実行することで、Solid Queueがproduction向けのキャッシュストアとして設定され、config/queue.ymlconfig/recurring.ymlが作成されます。また、bin/jobs実行可能ラッパーも作成され、ここからSolid Queueを起動できます。

次に、config/database.ymlファイルにキュー用のデータベース設定を追加する必要があります。SQLiteを使っている場合は、以下のような設定になります。

production:
  primary:
    <<: *default
    database: storage/production.sqlite3
  queue:
    <<: *default
    database: storage/production_queue.sqlite3
    migrations_paths: db/queue_migrate

MySQL/PostgreSQL/Trilogyを使っている場合は、以下のような設定になります。

production:
  primary: &primary_production
    <<: *default
    database: app_production
    username: app
    password: <%= ENV["APP_DATABASE_PASSWORD"] %>
  queue:
    <<: *primary_production
    database: app_production_queue
    migrations_paths: db/queue_migrate

: bin/rails solid_queue:installを呼び出すと、config/environments/production.rbファイルにconfig.solid_queue.connects_to = { database: { writing: :queue } }が自動的に追加されるため、追加設定は不要です(ただし、database.ymlqueueオプションで指定したキュー名と一致していなければなりません)。また、config/database.ymlファイルでデータベースとして利用している名前についても、従来と同様にconfig.solid_queue.connects_toで利用している名前と一致させてください。

設定が終わったら、db:prepareを実行して、データベースの作成とスキーマ読み込みが行われるようにしてください。

これで、作業しているサーバーでbin/jobsを実行すれば、ジョブの処理を開始する準備が整います。これにより、デフォルト設定を用いてすべてのキューのジョブの処理が開始されます。Solid Queueの設定について詳しくは後述します。

小規模プロジェクトの場合は、Webサーバーと同じマシンでSolid Queueを実行できます。スケーリングの準備ができたら、Solid Queueはすぐに利用可能な水平スケーリングをサポートします。
Solid QueueをWebサーバーと別のサーバーで実行することも、複数のマシンで同時にbin/jobsを実行することも可能です。設定を変更すれば、一部のマシンのみをディスパッチャ専用やワーカー専用として実行することも可能です。詳しくは、後述の設定セクションを参照してください。

: 今後のスキーマ変更は、通常のマイグレーションで行うことになります。

🔗 単一データベースによる設定方法

Solid Queueは別のデータベースで実行することが推奨されていますが、1個のデータベースをアプリとキューの両方で利用することも可能です。その場合は以下の手順を実行します。

  1. db/queue_schema.rbの内容を通常のマイグレーションファイルにコピーし、db/queue_schema.rbを削除します。
  2. production.rbconfig.solid_queue.connects_toを削除します。
  3. データベースのマイグレーションを実行することで、bin/jobsを実行可能になります。

データベースが複数ではないため、database.ymlにプライマリデータベースとキューデータベースを設定する必要はありません。

🔗 Solid Queueを段階的に導入する場合

ジョブを1件ずつ切り替える形でSolid Queueを段階的に導入する計画がある場合は、既存のバックエンドにあるconfig.active_job.queue_adapterは変更せずに、移行するジョブで以下のようにqueue_adapterを直接指定することで可能になります。

# app/jobs/my_job.rb

class MyJob < ApplicationJob
  self.queue_adapter = :solid_queue
  # ...
end

🔗 高パフォーマンスで必要な要件

Solid QueueはFOR UPDATE SKIP LOCKEDをサポートしているため、MySQL 8以降やPostgreSQL 9.5以降で利用した場合に最大のスループットが得られるように設計されています。これらより古いバージョンでも利用可能ですが、その場合は同じキューに対して複数のワーカーを実行するとロック待機が発生する可能性があります。小規模アプリケーションであればSQLiteでも利用可能です。

🔗 設定

🔗 ワーカー、ディスパッチャ、スケジューラ

Solid Queueには以下のようなアクターがあります。

ワーカー(worker)
実行可能になったジョブをキューから選択して処理する役目を担います。ワーカーはsolid_queue_ready_executionsテーブルで動作します。
ディスパッチャ(dispatcher)
今後実行するようスケジューリングされたジョブを選択してディスパッチする(dispatch: 振り分ける)役目を担います。ディスパッチ作業は、solid_queue_scheduled_executionsテーブルにあるジョブをsolid_queue_ready_executionsテーブルに移動してワーカーが拾い上げられるようにするというシンプルなものです。また、定期的なタスクの管理も担当しており、タスクを処理するためのジョブをスケジュールに沿ってディスパッチします。コンカレンシー制御に関連するメンテナンス作業もいくつか行います。
スケジューラ(scheduler)
定期的なタスクを管理し、時間になったらジョブをエンキューします。
スーパバイザ(supervisor)
設定に基づいてワーカーやディスパッチャをforkし、ハートビートを制御し、必要に応じて停止や開始のシグナルを送信します。

Solid Queueのスーパーバイザは、監視対象のワーカー/ディスパッチャ/スケジューラーごとに個別のプロセスをforkします。

Solid Queueは、デフォルトではconfig/solid_queue.ymlにある設定ファイルの探索を試みますが、SOLID_QUEUE_CONFIG環境変数や-c/--config_fileオプションで探索パスを変更することも可能です。

bin/jobs -c config/calendar.yml

この設定ファイルは以下のような感じになります。

production:
  dispatchers:
    - polling_interval: 1
      batch_size: 500
      concurrency_maintenance_interval: 300
  workers:
    - queues: "*"
      threads: 3
      polling_interval: 2
    - queues: [ real_time, background ]
      threads: 5
      polling_interval: 0.1
      processes: 3

必須項目はなく、すべての項目はオプションです。設定がまったく指定されていない場合、Solid Queueはディスパッチャ1個とワーカー1個のデフォルト設定で実行されます。ディスパッチャやワーカーの一方だけを実行する場合は、以下のようにそのセクションのみを設定に含める必要があります。

production:
  dispatchers:
    - polling_interval: 1
      batch_size: 500
      concurrency_maintenance_interval: 300

上の設定ではディスパッチャが1個だけ実行され、ワーカーは実行されません。

さまざまなオプションの概要を以下に示します。

polling_interval
ワーカーとディスパッチャが追加ジョブをチェックする前に待機する時間を指定します(単位は秒)。デフォルト値は、ディスパッチャが1秒、ワーカーが0.1秒です。
batch_size
ディスパッチャがジョブをディスパッチするときのバッチサイズを指定します。デフォルトは500です。
concurrency_maintenance_interval
ブロック中のジョブがブロック解除可能になったかどうかをディスパッチャがチェックするまで待機する時間を指定します(単位は秒)。詳しくはコンカレンシー制御を参照してください。デフォルト値は600秒です。
queues
ワーカーがジョブを選択するキューのリストを指定します。*を指定すると、すべてのキューが対象となります(無指定の場合はこれがデフォルトです)。このオプションには単一のキューを指定することも、キューのリストを配列で指定することも可能です。
ジョブはこれらのキューから順にポーリングされるので、たとえば[ real_time, background ]を指定すると、real_timeで待機中のジョブがすべてなくなるまでbackgroundのジョブはワーカーに拾われなくなります。以下のようにワイルドカードを使って特定のキューにマッチするプレフィックスを指定することも可能です。
staging:
workers:
  - queues: staging*
    threads: 3
    polling_interval: 5

上の設定にすると、stagingで始まるすべてのキューからジョブをフェッチするワーカーが作成されます。このワイルドカード*は、キュー名の末尾に1個しか書けません。つまり*_some_queueのようなキュー名は指定できません(指定しても無視されます)。
最後に、[ staging*, background ]のように名前のプレフィックスと正確な名前を組み合わせることも可能です。順序に関する振る舞いは、正確な名前のみを組み合わせた場合と同じになります。

threads
各ワーカーがジョブを実行するのに必要な最大スレッドプール数です。個別のワーカーはこの個数以下のジョブをフェッチし、スレッドプールに配置して実行します。この設定はワーカーだけにあり、デフォルトは3です。
processes
指定の設定でスーパバイザによってforkされるワーカープロセス数の個数を指定します。デフォルトは1です(シングルプロセス)。この設定は、複数のCPUコアを1個のキュー(または同じ設定の複数のキュー)専用にしたい場合に便利です。この設定はワーカーだけにあります。
concurrency_maintenance
ディスパッチャでコンカレンシーのメンテナンスを行うかどうかを指定します(デフォルトではtrue)。この設定は、コンカレンシー制御を完全に無効にしたい場合や、ディスパッチャを複数実行している状態で一部のディスパッチャについては他の作業を行わずにディスパッチに専念させたい場合に便利です。

🔗 キューの順序と優先度

上述のように、あるワーカーでキューのリストをreal_time,backgroundのように指定すると、その順序でポーリングされます。この場合、real_timeに待機中のジョブがなくなるまでbackgroundのジョブはフェッチされなくなります。

Active Jobでは、ジョブをエンキューするときの優先度を正の整数で指定できます。Solid Queueは値が小さいほど優先度が高くなります。優先度のデフォルトは0です。

この設定は、重要度や緊急度が異なるジョブを同一のキューで実行する場合に便利です。
ただし優先度に基づいてジョブが選択されるのは同一キュー内の場合です。先ほどのreal_time,backgroundの例で説明すると、backgroundキューの優先度をより高くしても、real_timeキュー内のジョブは引き続きbackgroundキュー内のジョブよりも先にフェッチされます。

キューの順序と優先度を取り違えないためにも、指定するのはどちらか一方のみにとどめることをおすすめします。これにより、ジョブの実行順序が理解しやすくなります。

🔗 スレッド、プロセス、シグナル

Solid Queueのワーカーは、スレッドプールを用いて複数スレッドでの作業を実行します。これは上述のthreadsパラメータで設定できます。その他に、1台のマシン上の複数プロセス(ワーカーごとにprocessesパラメータで設定可能)や水平スケーリングによってパラレリズム(並列処理)を実現できます。

スーパバイザはこれらのプロセスを管理しており、以下のシグナルに応答します。

TERMINT
graceful shutdowを開始します。スーパバイザは監視対象プロセスにTERMシグナルを送信し、SolidQueue.shutdown_timeoutで指定された期間までプロセス終了を待機します。期間を過ぎても監視対象プロセスが残っている場合は、終了の必要があることを示すQUITシグナルを残りのプロセスに送信します。
QUIT
即時終了を開始します。スーパバイザは監視対象プロセスにQUITシグナルを送信し、プロセスをただちに終了させます。

QUITシグナルを受信したときに、ワーカーに実行中のジョブがまだ残っている場合、これらのジョブはプロセスが登録解除されるときにキューに戻されます。

プロセス終了前にクリーンアップする機会がなかった場合(ケーブルが抜けてしまったなど)、 実行中のプロセスによってジョブの実行がリクエスト中のままになる可能性があります。プロセスはハートビートを送信し、スーパバイザはハートビートが期限切れのプロセスをチェックしてクリーンアップし、リクエスト中のジョブをキューに戻します。ハートビートの送信頻度と、プロセス停止を判断するしきい値を設定できます。次のセクションを参照してください。

🔗 データベースを設定する

Solid Queueで利用するデータベースは、config/application.rbファイルまたはconfig/environments/production.rb環境設定ファイルのconfig.solid_queue.connects_toオプションで設定できます。デフォルトでは、インストール時に指定したデータベース設定に応じて、queueという名前の単一のデータベースが書き込みと読み出しの両方で使われます。

このとき、Active Recordのマルチデータベース用のオプションもすべて利用できます。

🔗 ライフサイクルのフック

Solid Queueでは、スーパーバイザのライフタイムの中で以下の2箇所のポイントにフックをかけられます。

start
スーパーバイザの起動完了後で、かつワーカーとディスパッチャをforkする直前のタイミング。
stop
シグナル(TERMINTQUIT)の受信後で、かつ正常なシャットダウンまたは即時シャットダウンを開始する直前のタイミング。

ワーカーのライフタイムでは、以下の2箇所のポイントにフックをかけられます。

worker_start
ワーカーの起動完了後で、かつポーリングループを開始する直前のタイミング。
worker_stop
シグナル(TERMINTQUIT)の受信後で、かつ正常なシャットダウンまたは即時シャットダウン(exit!のみ)を開始する直前のタイミング。

これらは、次のメソッドにブロックを渡すことで利用できます。

SolidQueue.on_start
SolidQueue.on_stop

SolidQueue.on_worker_start
SolidQueue.on_worker_stop

例:

SolidQueue.on_start { start_metrics_server }
SolidQueue.on_stop { stop_metrics_server }

これらは、フックを複数追加するために複数回呼び出すことも可能ですが、これらの呼び出しはSolid Queueが開始される前に行う必要があります。呼び出しはイニシャライザで行うのが適切でしょう。

🔗 その他の設定

注意: 本セクションの設定は、config/application.rbファイルまたは個別の環境設定ファイルでconfig.solid_queue.silence_polling = trueのように設定する必要があります。

Solid Queueの振る舞いを制御する以下の設定も利用できます。

logger
利用するロガーを指定します。デフォルトはアプリのロガーです。
app_executor
非同期操作をラップするのに使われるRails executorです。デフォルトではアプリのexecutorが使われます。
on_thread_error
発生した例外を引数として受け取るスレッド内でエラーが発生したときに呼び出す、カスタムlambda/Procを指定します。デフォルトは以下です。
ruby
-> (exception) { Rails.error.report(exception, handled: false) }

これは、ジョブ実行中にraiseしたエラーには使われません。ジョブでraiseしたエラーは、Active Jobのretry_ondiscard_onによって処理され、最終的には失敗したジョブになります。これは、Solid Queue自体でraiseしたエラー用です。
use_skip_locked
ロック読み取りの実行時にFOR UPDATE SKIP LOCKEDを利用するかどうかを指定します。この設定は将来自動検出される予定です。現在は、FOR UPDATE SKIP LOCKEDをサポートしていないデータベース(MySQL 8未満、PostgreSQL 9.5未満)では、この設定をfalseにするだけで済みます。SQLiteはシーケンシャル書き込みを行うので、この設定は無効です。
process_heartbeat_interval
すべてのプロセスが従うべきハートビート間隔です。デフォルトは60秒です。
process_alive_threshold
直近のハートビート後にプロセス停止とみなすまでの待ち時間です。デフォルトは5分です。
shutdown_timeout
TERMを送信したスーパバイザが、QUITを送信して即時終了を要求するまでの待ち時間です。デフォルトは5秒です。
silence_polling
ワーカーやディスパッチャをポーリングするときにActive Recordのログ出力を抑制するかどうかを指定します。デフォルトはtrueです。
supervisor_pidfile
起動時にスーパバイザが作成するpidfileへのパスです。起動時に同じホスト内で複数のスーパバイザが実行されないようにする場合や、ヘルスチェック用途に利用できます。デフォルトはnilです。
preserve_finished_jobs
完了したジョブをsolid_queue_jobsテーブルに残すかどうかを指定します。デフォルトはtrueです。
clear_finished_jobs_after
preserve_finished_jobsがtrueの場合に、完了したジョブを保持する期間を指定します。注意: 現時点では、完了したジョブの自動クリーンアップ機能はありません。ジョブをクリーンアップするにはSolidQueue::Job.clear_finished_in_batchesを定期的に呼び出す必要がありますが、これは近い将来自動化される予定です。
default_concurrency_control_period
コンカレンシー制御パラメータのデフォルト値として使われます。デフォルト値は3.minutesです。
enqueue_after_transaction_commit
ジョブをエンキューするタイミングをafter_commitフックの後にするかどうかを定義します(デフォルトはtrue)。詳しくは#51426を参照してください。

🔗 エンキュー時のエラー

Solid Queueは、ジョブをエンキューするときに発生するActive Recordエラーに対してSolidQueue::Job::EnqueueErrorをraiseします。
ActiveJob::EnqueueErrorをraiseしない理由は、このエラーはActive Jobによって処理されるので、perform_laterに渡す必要のなるブロック内でジョブをyieldするためには、perform_laterfalseを返してjob.enqueue_errorを設定する必要があるためです。これは独自のジョブには非常にうまく機能しますが、Rails(またはTurbo::Streams::BroadcastJobActiveStorage::AnalyzeJobなどの他のgem)によってエンキューされたジョブの場合は、perform_laterの呼び出しを制御できないため、失敗の処理が非常に難しくなります。

定期実行されるタスクの場合は、タスクに対応するジョブをエンキューするときにこのようなエラーがraiseすると、処理とログ出力は行われますが、エラーが上位に伝達されません。

🔗 コンカレンシー制御

Solid Queueは、Active Jobをコンカレンシー制御で拡張することで、特定の型や特定の引数を持つジョブの最大同時実行可能数を制限できます。この方法で制限をかけるとジョブの実行はブロックされ、別のジョブが終了してブロックが解除されるか、設定されている有効期間(コンカレンシー制限のduration)を過ぎるまで、ブロックされたままになります。ジョブはブロックされることはあっても、破棄されたり失われたりすることはありません。

class MyJob < ApplicationJob
  limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group

  # ...
key
唯一の必須パラメータであり、ジョブ引数をパラメータとして受け取ります。渡すジョブ引数は、制限する必要のあるジョブを識別するのに用いられ、「シンボル」「文字列」「proc」のいずれかを渡すことが可能です。procがActive Recordのレコードを返す場合、キーはそのクラス名とidからビルドされます。
to
デフォルト値は1です。
duration
はデフォルトでSolidQueue.default_concurrency_control_periodの値(デフォルト値は3.minutes)が設定されますが、設定の変更も可能です。
group
異なるジョブクラスをグループ化してコンカレンシーを制御するのに使います。デフォルトではジョブクラス名が使われます。

これらの制御がジョブに含まれている場合、同じkeyを生成するジョブは最大数(toで指定される)までコンカレントに実行されるようになり、この保証はキューに入れられた各ジョブのdurationの間継続します。しかしジョブの実行順序については保証されず、保証されるのは同時に(重複)実行される ジョブについてのみである点にご注意ください。

コンカレンシーの制限では、エンキュー時にセマフォの概念が利用されます。これは次のように機能します。
ジョブがエンキューされると、コンカレンシー制御が指定されているかどうかを確認します。指定されている場合は、計算された同時実行キーのセマフォを確認します。
セマフォがオープンの場合は、セマフォをリクエストしてジョブをreadyに設定します。ready とは、ワーカーがそれを取得して実行できることを意味します。ジョブの実行が終了すると (成功または失敗して実行が失敗した場合)、セマフォにシグナルを送信して、同じキーを持つ次のジョブがあれば、そのブロック解除を試みます。

ここで、次のジョブのブロックを解除することは、そのジョブをただちに実行するという意味ではなく、ジョブがblockedからreadyに移行するという意味です。最初のジョブがセマフォを解放したときに、次のジョブのブロック解除が何らかの形で妨げられる可能性があるため(例: ワーカーが実行されているマシンの電源プラグを誰かが抜いた場合)、フェイルセーフとしてdurationが用意されています。
期間を超えてブロックされているジョブは解放の候補になりますが、各ジョブが「セマフォダンスチェック」をパスしなければならないため、コンカレンシールールで許可されている数だけ解放されます。つまり、durationは、実際にはエンキューされたジョブや実行中のジョブに関するものではなく、待機中にブロックされたジョブに関するものです。

以下の例を見てみましょう。

class DeliverAnnouncementToContactJob < ApplicationJob
  limits_concurrency to: 2, key: ->(contact) { contact.account }, duration: 5.minutes

  def perform(contact)
    # ...

上のcontactaccountActiveRecordのレコードです。この場合、同じaccountに対するDeliverAnnouncementToContactという種類のジョブが最大2個コンカレントに実行されるようになります。
何らかの理由でジョブの1つが5分以上かかった場合や、ジョブを取得してから5分以内にコンカレンシーロックが解放されなかった場合は、同じキーを持つ別の新しいジョブがロックを取得する可能性があります。

groupの別の利用例を見てみましょう。

class Box::MovePostingsByContactToDesignatedBoxJob < ApplicationJob
  limits_concurrency key: ->(contact) { contact }, duration: 15.minutes, group: "ContactActions"

  def perform(contact)
    # ...
class Bundle::RebundlePostingsJob < ApplicationJob
  limits_concurrency key: ->(bundle) { bundle.contact }, duration: 15.minutes, group: "ContactActions"

  def perform(bundle)
    # ...

上の場合、contactレコード(idは123)用のBox::MovePostingsByContactToDesignatedBoxJobというジョブと、bundleレコード(contactレコード123を参照している)用のBundle::RebundlePostingsJobという別のジョブが同時にエンキューされると、処理の続行を許されるジョブはどちらか1つだけとなります。続行を許されなかった方のジョブは、最初のジョブが完了するまで(または最初に何が起ころうと15分経過するまで)ブロックされたままになります。

ここで注意が必要なのは、durationの設定は、ディスパッチャで設定したconcurrency_maintenance_intervalの値に間接的に依存している点です(この値は、ブロックされたジョブをチェックしてブロックを解除する作業の頻度を指定するからです)。durationの値は、一般に「すべてのジョブがその期間内で適切に完了する」ように設定すべきであり、コンカレンシーのメンテナンスタスクは、あくまで「何か異常が発生したときのフェイルセーフ」として考える必要があります。

ジョブのブロック解除は優先度順に行われますが、ジョブのブロック解除ではキューの順序は考慮されません。つまり、コンカレンシーグループを共有しているジョブのグループが別のキューにも存在する場合や、同じクラスのジョブが別のキューにもエンキューされている場合は、ブロックされたジョブのブロック解除時に、ワーカーに設定したキューの順序は考慮されません。
その理由は、実行中のジョブが次のジョブのブロックを解除するときに、そのジョブ自身が特定のワーカーのキューの順序を認識していなければ(キュー順序の異なるワーカーが複数存在する可能性すらあります)、優先度しか認識しようがないためです。ブロックされたジョブがブロック解除されてポーリング可能になると、キューの順序に沿ってワーカーによって取得されます。

最後に、自動または手動で再試行されるようになっているジョブが失敗すると、その失敗したジョブは、新たにエンキューされる別のジョブと同じように扱われます。つまり、どちらもエンキューされてロックを取得しようとし、ロックを取得すれば実行されます。ジョブが過去にロックを既に取得していたかどうかは関係ありません。

🔗 失敗したジョブとリトライ

Solid Queueには自動リトライのしくみはなく、自動リトライについてはActive Jobに依存しています。「失敗した実行」はシステム内で保持され、そうしたジョブ用にsolid_queue_failed_executionsテーブルのレコードが作成されます。このジョブは、手動で破棄するか再度エンキューするまで残り続けます。
ジョブの破棄やリトライは以下のようにコンソールで実行できます。

failed_execution = SolidQueue::FailedExecution.find(...) # 自分のジョブに関連する失敗した実行を検索する
failed_execution.error   # エラーを調べる

failed_execution.retry   # 初回と同様にジョブを新たにエンキューする
failed_execution.discard # ジョブをシステムから削除する

ただし、失敗したジョブを調べてリトライや破棄を実行できるmission_control-jobsというダッシュボードがあるので、チェックすることをおすすめします。

basecamp/mission_control-jobs - GitHub

🔗 ジョブのエラーを報告する

Railsと統合されている一部のエラートラッキングサービス(SentryやRollbarなど)は、Active Jobにフックする形で、ジョブ実行中に発生した未処理のエラーを自動的に報告します。ただし、エラートラッキングシステムがこれを行わない場合や、またはカスタムレポートが必要な場合は、以下の方法でActive Jobに手動でフックすることが可能です。

# application_job.rb
class ApplicationJob < ActiveJob::Base
  rescue_from(Exception) do |exception|
    Rails.error.report(exception)
    raise exception
  end
end

上のロジックを複製してActionMailer::MailDeliveryJobにも配置する必要があることにご注意ください。これは、ActionMailerApplicationJobを継承せず、代わりに ActiveJob::Baseから継承したActionMailer::MailDeliveryJobを利用するためです。

# application_mailer.rb

class ApplicationMailer < ActionMailer::Base
  ActionMailer::MailDeliveryJob.rescue_from(Exception) do |exception|
    Rails.error.report(exception)
    raise exception
  end
end

🔗 Pumaプラグイン

Solid QueueのスーパバイザをPumaと一緒に実行して、Pumaで監視や管理を行いたい場合に利用できるPumaプラグインを提供しています。puma.rb設定ファイルに以下を追加するだけで利用できます。

plugin :solid_queue

🔗 ジョブとトランザクションの整合性について

警告:
ジョブをアプリケーションデータと同じACID準拠のデータベースに配置すると、強力なツールが有効になります。つまり、トランザクション整合性を利用して、ジョブがコミットされない限りアプリ内の特定のアクションもコミットされないようにする(およびその逆)ことが可能になります。また、エンキューされたトランザクションがコミットされるまで、ジョブがエンキューされないようにします。これは非常に強力かつ有用ですが、将来Active Jobのバックエンドを別のものに差し替えたり、Solid Queueを単に専用のデータベースに移動して振る舞いが急に変更されたりすると、逆効果になる可能性があります。

これは非常に扱いにくくなる可能性があるのと、ほとんどの人はこの点について心配する必要はないため、デフォルトのSolid Queueはメインアプリとは別のデータベースに設定され、進行中のトランザクションがコミットされるまでジョブのエンキューは延期されます(Active Job組み込みの機能が使われます)。つまり、Solid Queueをアプリと同一のデータベースで実行したとしても、このトランザクションの整合性は利用できません。

この振る舞いを変更したい場合は、config.active_job.enqueue_after_transaction_commitneverに設定できます。このオプションはジョブごとに設定することも可能です。

この設定をneverに変更したときにもトランザクション整合性に悪影響が生じないようにしたい場合は、以下の操作で可能になります。

  • 特定レコードに依存するジョブは、常にafter_commitコールバックでエンキューするか、ジョブが使うデータが「ジョブがエンキューされる前に」確実にコミットされる場所でエンキューする。
  • Solid Queueがアプリと別のデータベースを使うように設定を変更し、アプリケーションのリクエストやジョブ実行を処理するスレッド上で「別のコネクションが使われる」ようにSolid Queueのデータベースを設定する(例↓)

class ApplicationRecord < ActiveRecord::Base
  self.abstract_class = true

  connects_to database: { writing: :primary, reading: :replica }
  ...
config.solid_queue.connects_to = { database: { writing: :primary, reading: :replica } }

🔗 定期的なタスク

Solid Queueは、将来の特定の時刻になると定期的に実行される、cronジョブのような繰り返しタスクの定義をサポートします。これらのタスクはスケジューラのプロセスによって管理され、独自の設定ファイルで定義されます。
デフォルトでは、ファイルはconfig/recurring.ymlに配置されますが、SOLID_QUEUE_RECURRING_SCHEDULE環境変数 を使うか、以下のようにbin/jobsコマンドに--recurring_schedule_fileオプションを指定することで、このパスを変更可能です。

bin/jobs --recurring_schedule_file=config/schedule.yml
production:
  a_periodic_job:
    class: MyJob
    args: [ 42, { status: "custom_status" } ]
    schedule: every second
  a_cleanup_task:
    command: "DeletedStuff.clear_all"
    schedule: every day at 9am

タスクはハッシュ(辞書)形式で指定され、ハッシュのキーは内部でタスクのキーになります。各タスクには、class(エンキューされるジョブクラスとなる)を指定するか、command(スケジュールに沿ってsolid_queue_recurringキューにエンキューされるジョブ (SolidQueue::RecurringJob)のコンテキストで評価される`)を指定する必要があります。

各タスクにはスケジュールも必要です。スケジュールはFugitで解析されるため、Fugitにcronとして渡せる引数はすべて渡せます。
各タスクには以下のオプションも指定できます。

args
ジョブに渡す引数。「単一の引数」「ハッシュ」「引数の配列」のいずれかを指定(配列を渡す場合は、末尾の要素にキーワード引数を含めることも可能です)。

上の設定例のジョブは、次のように毎秒(1秒に1回)エンキューされます。

MyJob.perform_later(42, status: "custom_status")
queue
ジョブをエンキューするのに使う別のキューを指定します。ない場合は、そのジョブクラス用に設定されたキューが使われます。
priority
ジョブをエンキューするのに使われる優先度の数値を指定します。

タスクは、指定の時刻になると、そのタスクを所有するディスパッチャによってエンキューされ、各タスクは次のタスクをスケジューリングします。この手法は、good_job gemの機能から多くのインスピレーションを得ています。

同一のrecurring_tasksを利用して複数のスケジューラを実行することも可能です。重複したタスクが同時にエンキューされるのを防ぐために、ジョブがエンキューされるのと同じトランザクション内に新しくsolid_queue_recurring_executionsテーブルが作成されます。
個別のタスクで1度に1個のエントリだけが作成されることを保証するために、このテーブルのtask_keyrun_atにunique indexが設定されます。この機能は、preserve_finished_jobstrueに設定した場合にのみ有効で(デフォルトではtrue)、この保証はジョブが維持されている限り適用されます。

: 単一の定期的スケジュールはサポートされているので、同一スケジュールを利用するスケジューラを複数使うことは可能ですが、設定の異なるスケジュールを複数利用することはできません。

最後に、Solid Queueで処理しないジョブを構成することも可能です。これは、アプリで以下のような形でジョブを実行するだけでできます。

class MyResqueJob < ApplicationJob
  self.queue_adapter = :resque

  def perform(arg)
    # ..
  end
end

Solid Queueで以下のコンフィグを利用することも可能です。

  dispatchers:
    - recurring_tasks:
        my_periodic_resque_job:
          class: MyResqueJob
          args: 22
          schedule: "*/5 * * * *"

この場合、ジョブはperform_laterによってエンキューされるので、Resqueで実行されます。ただし、このジョブはどのsolid_queue_recurring_executionレコードでもトラッキングされなくなるので、ジョブのエンキューが1回限りであることは保証されなくなります。

🔗 インスパイアされたプロジェクト

Solid Queueは、resqueGoodJobにインスパイアされています。これらのプロジェクトは私たちが多くを学んだ素晴らしい事例なので、ぜひチェックしてみてください。

resque/resque - GitHub

bensheldon/good_job - GitHub

🔗 License

このgemは、MIT Licenseに基づいてオープンソースとして利用可能です。

関連記事

Rails APIドキュメント: Active Recordのトランザクション(翻訳)

Rails API: ActiveRecord::NestedAttributes(翻訳)


CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。