発端
Django + Redis(時に ElastiCashe)+ Celery の組み合わせは、よく使います。
- 非同期処理は Python スレッドは禁止(結果として重い物を流すと gunicorn/uWSGI の反応がなくなるので)。Celery workerを使う。
- cron ジョブ的なバッチ処理も、Celery beat で書く。
という感じでやってます。
簡単に書けるのですが、流れたかどうか知りたい場合、Celery worker が1台だけならログファイルを見ればいいとして、複数台になってくると、どこで流れたんだ、という所から調べるのが大変。
何を依頼して、何が失敗したか知りたい
ということで、依頼 -> 成功時/失敗時の実行ログを RDB のモデルに出して、adminサイトで確認できるようにします。
もちろん、Celery worker が1台だけの場合でも、ログファイルを追うより楽になります。
既に Celery Task をいっぱい書いてしまっているので、なるべく簡単に実現したい。
継承元のTask
通常は、from celery import Task
で書き始めますが、エラーハンドリング対応の Task を書くことにします。
呼び出し側は、delay() の次に呼ばれる apply_async() で依頼時のログを出しています。
呼ばれた側は、on_success() で正常時、on_failure() で異常時のログを出しています。
使い方は、SampleTask のように継承元を Task -> BaseHandlingTask に変えればOK。
ちなみに、delay() の非同期で作っておいて、やっぱり同期でということで、run() に直した場合は何も記録されません。
app/tasks/base/handling.py
import json
import logging
import socket
import sys
import traceback
from celery import Task
from django.db import transaction
from app.models import JobState
class BaseHandlingTask(Task):
"""エラー ハンドリングを行うベースタスク"""
logger = logging.getLogger('prj')
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, **options):
"""Apply tasks asynchronously by sending a message."""
async_result = None
try:
async_result = super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
except: # Redis が上がっていない等
# 呼び出し側が atomic ブロック内だと、一緒に rollback されてしまうので、2つめの接続を使う
with transaction.atomic(using='force'):
exc_type, exc_value, exc_traceback = sys.exc_info()
job_state = JobState() # task_id はない
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
job_state.origin = socket.gethostname()
job_state.exception_class = exc_value.__class__.__name__
job_state.exception_msg = exc_value.args[0] if exc_value.args else exc_type.__module__ + '.' + exc_type.__name__
job_state.traceback = traceback.format_exc()
job_state.save(using='force')
raise
# 起動 成功時 - Redis は上がっている。Celery は上がっている/上がっていない(is_complete == False のまま残る)
job_state, is_created = JobState.objects.get_or_create(task_id=async_result.id)
job_state.task_name = async_result.task_name
job_state.name = async_result.task_name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
job_state.origin = socket.gethostname()
job_state.save()
return async_result
def on_success(self, retval, task_id, args, kwargs):
"""Success handler - 正常時のハンドラー - Celery worker 側で呼ばれる"""
job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
if self.request.get('origin'):
job_state.origin = self.request.get('origin')
job_state.hostname = self.request.get('hostname')
job_state.is_complete = True
job_state.save()
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Error handler - エラー時のハンドラー - Celery worker 側で呼ばれる
- run() の atomic ブロック内であっても、別途 worker から呼ばれるので、記録は rollback されない
"""
job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
job_state.task_name = self.name
job_state.name = self.name.split('.')[-1]
if args:
job_state.args = json.dumps(list(args))
if kwargs:
job_state.kwargs = json.dumps(kwargs)
if self.request.get('origin'):
job_state.origin = self.request.get('origin')
job_state.hostname = self.request.get('hostname')
job_state.exception_class = exc.__class__.__name__
job_state.exception_msg = str(exc)
job_state.traceback = str(einfo)
job_state.save()
def run(self, *args, **kwargs):
"""The body of the task executed by workers."""
raise NotImplementedError('Tasks must define the run method.')
class SampleTask(BaseHandlingTask):
"""サンプル タスク"""
logger = logging.getLogger('prj')
def run(self, is_error=False):
self.logger.info('SampleTask start...')
with transaction.atomic():
if is_error:
raise ValueError('エラーです')
self.logger.info('SampleTask end.')
JobState モデル
ログの出力先のモデルです。
- タスク名
- 依頼時の引数
- どのホストから起動されたか
- どのホストで実行されたか
- 正常に完了したか
- Exception 時は、例外のクラス/メッセージ/traceback
がわかるようになっています。
app/models/job_state.py
from django.db import models
class JobState(models.Model):
"""ジョブ状態"""
task_id = models.CharField('タスクID', max_length=255, blank=True, null=True, db_index=True) # UUID
task_name = models.CharField('タスク名', max_length=255, blank=True, null=True) # 例: app.tasks.handling.SampleTask
name = models.CharField('クラス名', max_length=255, blank=True, null=True) # 例: SampleTask
args = models.TextField('args', null=True, blank=True)
kwargs = models.TextField('kwargs', null=True, blank=True)
is_complete = models.BooleanField('完了', default=False)
origin = models.CharField('origin', max_length=255, blank=True, null=True) # Name of host that sent this task.
hostname = models.CharField('hostname', max_length=255, blank=True, null=True) # Node name of the worker instance executing the task.
exception_class = models.CharField('例外クラス', max_length=255, null=True, blank=True, default='')
exception_msg = models.CharField('例外メッセージ', max_length=255, null=True, blank=True, default='')
traceback = models.TextField('traceback', null=True, blank=True, default='')
created_at = models.DateTimeField('登録日時', auto_now_add=True, blank=True, null=True)
updated_at = models.DateTimeField('更新日時', auto_now=True, blank=True, null=True)
def __str__(self):
return self.task_id if self.task_id else str(self.id)
adminサイト登録
adminサイトで JobState を見れるようにします。
そのままだと長くなるので、所々短縮してたりします。
app/admin.py
from django.contrib import admin
from app.models import JobState
class JobStateAdmin(admin.ModelAdmin):
"""ジョブ状態"""
list_display = ('id', 'task_id_shorten', 'name', 'x_args', 'x_kwargs', 'is_complete', 'exception_class', 'origin', 'hostname', 'created_at')
list_display_links = ('id', 'task_id_shorten', 'name')
list_filter = ('is_complete',)
search_fields = ['task_id', 'task_name', 'name']
def task_id_shorten(self, obj):
return obj.task_id[:8] + '...' if obj.task_id else ''
task_id_shorten.short_description = 'タスクID'
def x_args(self, obj):
return obj.args[:20] + '...' if obj.args and len(obj.args) > 20 else obj.args
x_args.short_description = 'args'
def x_kwargs(self, obj):
return obj.kwargs[:20] + '...' if obj.kwargs and len(obj.kwargs) > 20 else obj.kwargs
x_kwargs.short_description = 'kwargs'
admin.site.register(JobState, JobStateAdmin)
settings
呼び出し側が transaction.atomic() だと、異常時に一緒に JobState も rollback されてしまうので、ちょっと強引ですが、もう一つ強制書き込み用の Database 接続を増やしてます。
そこまでしなくていいよ、という場合はお好みで。
DATABASE = {} の下に、以下を追加
prj/settings/local.py
# atmic ブロック内で Exception しても、JobState には書き込みできるよう、同じ設定で2つ目の接続を作る
DATABASES.update({'force': DATABASES['default']})
以上
荒削りのコードですが、一目瞭然で見れるようになりました。
正常時も含めると多すぎるという場合は、正常時に出している所をコメントアウトする。
また、cron ジョブ(Celery beat)で15分単位で流れるものは、これも多すぎるので、そもそも BaseHandlingTask を継承しないで、素の Task を継承する。
等で調整して下さい。