Celery 一定時間ごとに処理を繰り返す(beat_schedule)

Celeryで一定間隔ごとに処理を実行したい場合、
beat_scheduleの機能を使用するのが便利そうです。
Periodic Tasks


Time Zones


時刻を指定しての実行を想定している場合は、タイムゾーンを設定に追記しておきます。

timezone = 'Asia/Tokyo'





プログラムによる設定


プログラムで定期実行するタスクを指定する場合は、
on_after_configure.connect
イベントをフックして設定します。

  1. from celery import Celery
  2. app = Celery('proj')
  3. # 設定ファイルを読み込み
  4. app.config_from_object('celeryconfig')
  5. @app.on_after_configure.connect
  6. def setup_periodic_tasks(sender: Celery, **kwargs):
  7.     # Calls test('hello') every 10 seconds.
  8.     sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
  9. @app.task
  10. def test(arg):
  11.     print(arg)
  12. if __name__ == '__main__':
  13.     app.start()



ここでワーカーを起動するのですが、オプションを指定し、
celery beatを有効にして起動する必要があります。

beatのみ有効な状態で起動する場合

$ celery -A proj beat



beatを有効にしつつワーカーも起動する場合

$ celery -A proj worker -B



beatを有効にした場合は、最後に実行した時間を記録するcelerybeat-scheduleファイルが
実行ディレクトリに生成されます。
出力パスを変更する場合は-sオプションでフルパスを指定します。

$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule





app.conf.beat_schedule


app.conf.beat_schedule で実行するタスクを指定することもできます。
先程の例をbeat_scheduleで書き換えたものはこちらになります。

  1. from celery import Celery
  2. app = Celery('proj')
  3. # 設定ファイルを読み込み
  4. app.config_from_object('celeryconfig')
  5. app.conf.beat_schedule = {
  6.     'add-every-10-seconds': {
  7.         'task': 'proj.celery.test',
  8.         'schedule': 10.0,
  9.         'args': ("hello",)
  10.     },
  11. }
  12. @app.task
  13. def test(arg):
  14.     print(arg)
  15. if __name__ == '__main__':
  16.     app.start()



ポイントは、
taskの指定はフルネーム(ワーカー起動時に[tasks]に表示される名称を指定)
argsの指定はtupleかlist
文字列を指定したい場合は、例のようにタプルに文字列を指定する



設定ファイルでの指定


設定ファイルに記載する場合はbeat_scheduleの項目を作成し記載します。
https://docs.celeryq.dev/en/latest/userguide/configuration.html#beat-settings-celery-beat

・celeryconfig.py

  1. broker_url = 'amqp://myuser:[email protected]:5672/myvhost'
  2. result_backend = 'cache+memcached://192.168.11.202:11211/'
  3. include = ['proj.tasks']
  4. result_expires = 3600
  5. broker_connection_retry_on_startup = True
  6. timezone = 'Asia/Tokyo'
  7. beat_schedule = {
  8.     'add-every-10-seconds': {
  9.         'task': 'proj.celery.test',
  10.         'schedule': 10.0,
  11.         'args': ("hello",)
  12.     },
  13. }




relative


これまで10秒間隔でタスクを実行する例を試してみました。
起動間隔は10秒ですが、ワーカーを開始した時間により微妙な秒数に実行されます。

00秒、10秒、20秒...ときりのよい時間に実行したい場合は、
relative = True
を指定してやります。

  1. beat_schedule = {
  2.     'add-every-10-seconds': {
  3.         'task': 'proj.tasks.add',
  4.         'schedule': 10.0,
  5.         'args': (1, 2),
  6.         'relative': True
  7.     },
  8. }





crontab形式での指定


scheduleの項目はcrontab形式でも指定できるようです。

  1. from celery.schedules import crontab
  2. app.conf.beat_schedule = {
  3.     # Executes every Monday morning at 7:30 a.m.
  4.     'add-every-monday-morning': {
  5.         'task': 'tasks.add',
  6.         'schedule': crontab(hour=7, minute=30, day_of_week=1),
  7.         'args': (16, 16),
  8.     },
  9. }



関連記事

コメント

プロフィール

Author:symfo
blog形式だと探しにくいので、まとめサイト作成中です。
https://symfo.web.fc2.com/

PR

検索フォーム

月別アーカイブ