-
Notifications
You must be signed in to change notification settings - Fork 3
/
rmq_queue.py
115 lines (87 loc) · 3.74 KB
/
rmq_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
RabbitMQ wrapper
Example:
channel = RmqChannel()
channel.declare_queue(RmqQueueName.FROM_WORKER)
channel.receive(RmqQueueName.GROUP, recv)
channel.start()
recv function:
def recv(ch, method, properties, body):
"(your data in the body)"
data = json.loads(body.decode());
send:
channel.send(RmqQueueName.FROM_WORKER, {"some_data": "value"})
"""
from enum import Enum
import pika
import os
import json
DEBUG = False
class RmqQueueName(Enum):
TO_WORKER = 'KZ_HIKCAM_SCAN_TO_WORKER'
FROM_WORKER = 'KZ_HIKCAM_SCAN_FROM_WORKER'
class RmqChannel:
def __init__(self, prefetch_count=1):
""" create rabbitmq connection """
if not DEBUG:
user = os.environ.get('RABBITMQ_DEFAULT_USER', 'rabbit')
password = os.environ.get('RABBITMQ_DEFAULT_PASS', 'carrotinuranus')
host = os.environ.get('RABBITMQ_HOST', '192.168.0.10')
port = os.environ.get('RABBITMQ_PORT', '5772')
heartbeat = os.environ.get('RABBITMQ_HEARTBEAT', '0')
print("connecting to {}:{}".format(host, port))
params = pika.ConnectionParameters(
host=host,
port=int(port),
credentials=pika.credentials.PlainCredentials(user,
password),
heartbeat_interval=int(heartbeat),
)
self.connection = pika.BlockingConnection(parameters=params)
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=prefetch_count)
else:
user = 'rabbit'
password = 'carrotinuranus'
host = '192.168.0.10'
port = '5772'
heartbeat = '0'
print("connecting to {}:{}".format(host, port))
params = pika.ConnectionParameters(
host=host,
port=int(port),
credentials=pika.credentials.PlainCredentials(user,
password),
heartbeat_interval=int(heartbeat),
)
self.connection = pika.BlockingConnection(parameters=params)
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=prefetch_count)
print("connected")
def declare_queue(self, name: RmqQueueName, durable=True):
""" declare queue before using """
self.channel.queue_declare(name.value, durable=durable)
def delete_queue(self, name: RmqQueueName):
self.channel.queue_delete(name.value)
def redeclare_queue(self, name: RmqQueueName, durable=True):
self.delete_queue(name)
self.declare_queue(name, durable)
def queue_length(self, name: RmqQueueName):
q = self.channel.queue_declare(queue=name.value, durable=False, passive=True, exclusive=False,
auto_delete=False)
return q.method.message_count
def send(self, queue_name: RmqQueueName, data: dict):
""" send object to the queue """
self.channel.basic_publish(exchange='', routing_key=queue_name.value,
body=json.dumps(data, ensure_ascii=False),
properties=pika.BasicProperties(
delivery_mode=2, ))
def receive(self, queue_name: RmqQueueName, callback, no_ack=False):
"""callback = func(ch, method, properties, body) """
self.channel.basic_consume(callback,
queue=queue_name.value,
no_ack=no_ack)
def start(self):
self.channel.start_consuming()
def close(self):
return self.channel.close()