Celery 一定時間ごとに処理を繰り返す(beat_schedule)
Celeryで一定間隔ごとに処理を実行したい場合、beat_scheduleの機能を使用するのが便利そうです。
Periodic Tasks
Time Zones
時刻を指定しての実行を想定している場合は、タイムゾーンを設定に追記しておきます。
timezone = 'Asia/Tokyo'
プログラムによる設定
プログラムで定期実行するタスクを指定する場合は、
on_after_configure.connect
イベントをフックして設定します。
- from celery import Celery
- app = Celery('proj')
- # 設定ファイルを読み込み
- app.config_from_object('celeryconfig')
- @app.on_after_configure.connect
- def setup_periodic_tasks(sender: Celery, **kwargs):
- # Calls test('hello') every 10 seconds.
- sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
- @app.task
- def test(arg):
- print(arg)
- if __name__ == '__main__':
- app.start()
ここでワーカーを起動するのですが、オプションを指定し、
celery beatを有効にして起動する必要があります。
beatのみ有効な状態で起動する場合
$ celery -A proj beat
beatを有効にしつつワーカーも起動する場合
$ celery -A proj worker -B
beatを有効にした場合は、最後に実行した時間を記録するcelerybeat-scheduleファイルが
実行ディレクトリに生成されます。
出力パスを変更する場合は-sオプションでフルパスを指定します。
$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
app.conf.beat_schedule
app.conf.beat_schedule で実行するタスクを指定することもできます。
先程の例をbeat_scheduleで書き換えたものはこちらになります。
- from celery import Celery
- app = Celery('proj')
- # 設定ファイルを読み込み
- app.config_from_object('celeryconfig')
- app.conf.beat_schedule = {
- 'add-every-10-seconds': {
- 'task': 'proj.celery.test',
- 'schedule': 10.0,
- 'args': ("hello",)
- },
- }
- @app.task
- def test(arg):
- print(arg)
- if __name__ == '__main__':
- app.start()
ポイントは、
taskの指定はフルネーム(ワーカー起動時に[tasks]に表示される名称を指定)
argsの指定はtupleかlist
文字列を指定したい場合は、例のようにタプルに文字列を指定する
設定ファイルでの指定
設定ファイルに記載する場合はbeat_scheduleの項目を作成し記載します。
https://docs.celeryq.dev/en/latest/userguide/configuration.html#beat-settings-celery-beat
・celeryconfig.py
- broker_url = 'amqp://myuser:[email protected]:5672/myvhost'
- result_backend = 'cache+memcached://192.168.11.202:11211/'
- include = ['proj.tasks']
- result_expires = 3600
- broker_connection_retry_on_startup = True
- timezone = 'Asia/Tokyo'
- beat_schedule = {
- 'add-every-10-seconds': {
- 'task': 'proj.celery.test',
- 'schedule': 10.0,
- 'args': ("hello",)
- },
- }
relative
これまで10秒間隔でタスクを実行する例を試してみました。
起動間隔は10秒ですが、ワーカーを開始した時間により微妙な秒数に実行されます。
00秒、10秒、20秒...ときりのよい時間に実行したい場合は、
relative = True
を指定してやります。
- beat_schedule = {
- 'add-every-10-seconds': {
- 'task': 'proj.tasks.add',
- 'schedule': 10.0,
- 'args': (1, 2),
- 'relative': True
- },
- }
crontab形式での指定
scheduleの項目はcrontab形式でも指定できるようです。
- from celery.schedules import crontab
- app.conf.beat_schedule = {
- # Executes every Monday morning at 7:30 a.m.
- 'add-every-monday-morning': {
- 'task': 'tasks.add',
- 'schedule': crontab(hour=7, minute=30, day_of_week=1),
- 'args': (16, 16),
- },
- }
- 関連記事
-
- Celery タスクごとに並列数を変更する
- Celery モニタリングツール flower 2.0のインストールと表示内容
- Celery 一定時間ごとに処理を繰り返す(beat_schedule)
- Celery 異なるタスクを並列で実行し、結果を異なるタスクに渡して実行する
- Celery 5.4 ログの出力について
コメント