|
28 | 28 | class QueueListener(object): |
29 | 29 | _sentinel_item = None |
30 | 30 |
|
31 | | - def __init__(self, queue, queue_get_timeout, *handlers): |
| 31 | + def __init__(self, queue, *handlers, **kwargs): |
32 | 32 | self.queue = queue |
33 | | - self.queue_get_timeout = queue_get_timeout |
| 33 | + self.queue_get_timeout = kwargs.get("queue_get_timeout", None) |
34 | 34 | self.handlers = handlers |
35 | 35 | self._stop_nowait = threading.Event() |
36 | 36 | self._stop = threading.Event() |
37 | 37 | self._thread = None |
38 | 38 |
|
39 | | - def dequeue(self, block=True, timeout=None): |
| 39 | + def dequeue(self, block=True): |
40 | 40 | """Dequeue a record and return item.""" |
41 | | - return self.queue.get(block, timeout) |
| 41 | + return self.queue.get(block, self.queue_get_timeout) |
42 | 42 |
|
43 | 43 | def start(self): |
44 | 44 | """Start the listener. |
@@ -83,7 +83,7 @@ def _monitor(self): |
83 | 83 | has_task_done = hasattr(q, 'task_done') |
84 | 84 | while not self._stop.isSet(): |
85 | 85 | try: |
86 | | - record = self.dequeue(True, self.queue_get_timeout) |
| 86 | + record = self.dequeue(True) |
87 | 87 | if record is self._sentinel_item: |
88 | 88 | break |
89 | 89 | self.handle(record) |
@@ -161,8 +161,8 @@ def __init__(self, endpoint, project, token, api_base="api/v1", |
161 | 161 | "start_test_item", "finish_test_item", "log"] |
162 | 162 |
|
163 | 163 | self.queue = queue.Queue() |
164 | | - self.listener = QueueListener(self.queue, queue_get_timeout, |
165 | | - self.process_item) |
| 164 | + self.listener = QueueListener(self.queue, self.process_item, |
| 165 | + queue_get_timeout=queue_get_timeout) |
166 | 166 | self.listener.start() |
167 | 167 | self.lock = threading.Lock() |
168 | 168 |
|
|
0 commit comments