12
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Celeryの実行ログを取る

Last updated at Posted at 2017-03-21

発端

Django + Redis(時に ElastiCashe)+ Celery の組み合わせは、よく使います。

  • 非同期処理は Python スレッドは禁止(結果として重い物を流すと gunicorn/uWSGI の反応がなくなるので)。Celery workerを使う。
  • cron ジョブ的なバッチ処理も、Celery beat で書く。

という感じでやってます。

簡単に書けるのですが、流れたかどうか知りたい場合、Celery worker が1台だけならログファイルを見ればいいとして、複数台になってくると、どこで流れたんだ、という所から調べるのが大変。

何を依頼して、何が失敗したか知りたい

ということで、依頼 -> 成功時/失敗時の実行ログを RDB のモデルに出して、adminサイトで確認できるようにします。

もちろん、Celery worker が1台だけの場合でも、ログファイルを追うより楽になります。

既に Celery Task をいっぱい書いてしまっているので、なるべく簡単に実現したい。

継承元のTask

通常は、from celery import Task で書き始めますが、エラーハンドリング対応の Task を書くことにします。

呼び出し側は、delay() の次に呼ばれる apply_async() で依頼時のログを出しています。
呼ばれた側は、on_success() で正常時、on_failure() で異常時のログを出しています。

使い方は、SampleTask のように継承元を Task -> BaseHandlingTask に変えればOK。

ちなみに、delay() の非同期で作っておいて、やっぱり同期でということで、run() に直した場合は何も記録されません。

app/tasks/base/handling.py

import json
import logging
import socket
import sys
import traceback
from celery import Task
from django.db import transaction

from app.models import JobState


class BaseHandlingTask(Task):
    """エラー ハンドリングを行うベースタスク"""
    logger = logging.getLogger('prj')

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, **options):
        """Apply tasks asynchronously by sending a message."""
        async_result = None
        try:
            async_result = super().apply_async(args, kwargs, task_id, producer, link, link_error, **options)
        except:     # Redis が上がっていない等
            # 呼び出し側が atomic ブロック内だと、一緒に rollback されてしまうので、2つめの接続を使う
            with transaction.atomic(using='force'):
                exc_type, exc_value, exc_traceback = sys.exc_info()
                job_state = JobState()  # task_id はない
                job_state.task_name = self.name
                job_state.name = self.name.split('.')[-1]
                if args:
                    job_state.args = json.dumps(list(args))
                if kwargs:
                    job_state.kwargs = json.dumps(kwargs)
                job_state.origin = socket.gethostname()
                job_state.exception_class = exc_value.__class__.__name__
                job_state.exception_msg = exc_value.args[0] if exc_value.args else exc_type.__module__ + '.' + exc_type.__name__
                job_state.traceback = traceback.format_exc()
                job_state.save(using='force')
            raise

        # 起動 成功時 - Redis は上がっている。Celery は上がっている/上がっていない(is_complete == False のまま残る)
        job_state, is_created = JobState.objects.get_or_create(task_id=async_result.id)
        job_state.task_name = async_result.task_name
        job_state.name = async_result.task_name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        job_state.origin = socket.gethostname()
        job_state.save()

        return async_result

    def on_success(self, retval, task_id, args, kwargs):
        """Success handler - 正常時のハンドラー - Celery worker 側で呼ばれる"""
        job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
        job_state.task_name = self.name
        job_state.name = self.name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        if self.request.get('origin'):
            job_state.origin = self.request.get('origin')
        job_state.hostname = self.request.get('hostname')
        job_state.is_complete = True
        job_state.save()

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Error handler - エラー時のハンドラー - Celery worker 側で呼ばれる
            - run() の atomic ブロック内であっても、別途 worker から呼ばれるので、記録は rollback されない
        """
        job_state, is_created = JobState.objects.get_or_create(task_id=task_id)
        job_state.task_name = self.name
        job_state.name = self.name.split('.')[-1]
        if args:
            job_state.args = json.dumps(list(args))
        if kwargs:
            job_state.kwargs = json.dumps(kwargs)
        if self.request.get('origin'):
            job_state.origin = self.request.get('origin')
        job_state.hostname = self.request.get('hostname')
        job_state.exception_class = exc.__class__.__name__
        job_state.exception_msg = str(exc)
        job_state.traceback = str(einfo)
        job_state.save()

    def run(self, *args, **kwargs):
        """The body of the task executed by workers."""
        raise NotImplementedError('Tasks must define the run method.')


class SampleTask(BaseHandlingTask):
    """サンプル タスク"""
    logger = logging.getLogger('prj')

    def run(self, is_error=False):
        self.logger.info('SampleTask start...')
        with transaction.atomic():
            if is_error:
                raise ValueError('エラーです')
        self.logger.info('SampleTask end.')

JobState モデル

ログの出力先のモデルです。

  • タスク名
  • 依頼時の引数
  • どのホストから起動されたか
  • どのホストで実行されたか
  • 正常に完了したか
  • Exception 時は、例外のクラス/メッセージ/traceback

がわかるようになっています。

app/models/job_state.py

from django.db import models


class JobState(models.Model):
    """ジョブ状態"""
    task_id = models.CharField('タスクID', max_length=255, blank=True, null=True, db_index=True)   # UUID
    task_name = models.CharField('タスク名', max_length=255, blank=True, null=True)     # 例: app.tasks.handling.SampleTask
    name = models.CharField('クラス名', max_length=255, blank=True, null=True)          # 例: SampleTask
    args = models.TextField('args', null=True, blank=True)
    kwargs = models.TextField('kwargs', null=True, blank=True)
    is_complete = models.BooleanField('完了', default=False)
    origin = models.CharField('origin', max_length=255, blank=True, null=True)  # Name of host that sent this task.
    hostname = models.CharField('hostname', max_length=255, blank=True, null=True)  # Node name of the worker instance executing the task.
    exception_class = models.CharField('例外クラス', max_length=255, null=True, blank=True, default='')
    exception_msg = models.CharField('例外メッセージ', max_length=255, null=True, blank=True, default='')
    traceback = models.TextField('traceback', null=True, blank=True, default='')

    created_at = models.DateTimeField('登録日時', auto_now_add=True, blank=True, null=True)
    updated_at = models.DateTimeField('更新日時', auto_now=True, blank=True, null=True)

    def __str__(self):
        return self.task_id if self.task_id else str(self.id)

adminサイト登録

adminサイトで JobState を見れるようにします。
そのままだと長くなるので、所々短縮してたりします。

app/admin.py

from django.contrib import admin
from app.models import JobState

class JobStateAdmin(admin.ModelAdmin):
    """ジョブ状態"""
    list_display = ('id', 'task_id_shorten', 'name', 'x_args', 'x_kwargs', 'is_complete', 'exception_class', 'origin', 'hostname', 'created_at')
    list_display_links = ('id', 'task_id_shorten', 'name')
    list_filter = ('is_complete',)
    search_fields = ['task_id', 'task_name', 'name']

    def task_id_shorten(self, obj):
        return obj.task_id[:8] + '...' if obj.task_id else ''
    task_id_shorten.short_description = 'タスクID'

    def x_args(self, obj):
        return obj.args[:20] + '...' if obj.args and len(obj.args) > 20 else obj.args
    x_args.short_description = 'args'

    def x_kwargs(self, obj):
        return obj.kwargs[:20] + '...' if obj.kwargs and len(obj.kwargs) > 20 else obj.kwargs
    x_kwargs.short_description = 'kwargs'

admin.site.register(JobState, JobStateAdmin)

settings

呼び出し側が transaction.atomic() だと、異常時に一緒に JobState も rollback されてしまうので、ちょっと強引ですが、もう一つ強制書き込み用の Database 接続を増やしてます。

そこまでしなくていいよ、という場合はお好みで。

DATABASE = {} の下に、以下を追加

prj/settings/local.py

# atmic ブロック内で Exception しても、JobState には書き込みできるよう、同じ設定で2つ目の接続を作る
DATABASES.update({'force': DATABASES['default']})

以上

荒削りのコードですが、一目瞭然で見れるようになりました。

正常時も含めると多すぎるという場合は、正常時に出している所をコメントアウトする。

また、cron ジョブ(Celery beat)で15分単位で流れるものは、これも多すぎるので、そもそも BaseHandlingTask を継承しないで、素の Task を継承する。

等で調整して下さい。

12
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?