-
Notifications
You must be signed in to change notification settings - Fork 413
/
worker.py
55 lines (36 loc) · 1.39 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from datetime import timedelta
from functools import partial
from flask import current_app
import logging
from rq import get_current_job
from rq.decorators import job as rq_job
from bi import (
create_app,
settings,
redis_connection,
rq_redis_connection,
)
from bi.tasks.worker import Queue
default_operational_queues = ["periodic", "emails", "default"]
default_query_queues = ["scheduled_queries", "queries", "schemas"]
default_queues = default_operational_queues + default_query_queues
class StatsdRecordingJobDecorator(rq_job): # noqa
"""
RQ Job Decorator mixin that uses our Queue class to ensure metrics are accurately incremented in Statsd
"""
queue_class = Queue
job = partial(StatsdRecordingJobDecorator, connection=rq_redis_connection, failure_ttl=settings.JOB_DEFAULT_FAILURE_TTL)
class CurrentJobFilter(logging.Filter):
def filter(self, record):
current_job = get_current_job()
record.job_id = current_job.id if current_job else ""
record.job_func_name = current_job.func_name if current_job else ""
return True
def get_job_logger(name):
logger = logging.getLogger("rq.job." + name)
handler = logging.StreamHandler()
handler.formatter = logging.Formatter(settings.RQ_WORKER_JOB_LOG_FORMAT)
handler.addFilter(CurrentJobFilter())
logger.addHandler(handler)
logger.propagate = False
return logger