RabbitMQ 4 + pika 複数ワーカーで処理で処理待ちワーカーへの割当
RabbitMQ 4 + pikaの動作を確認しています。basic_ackで処理完了を通知するところまで見てみました。
RabbitMQ 4 + pika エラー発生時はキューに残す
こちらを参考にさせて頂いているのですが
RabbitMQでのメッセージ消失の対策
1つのキューに対して複数のワーカーで待ち受けるパターンの場合、均等に分散されるとのこと。
早速試してみます。
サンプル
メッセージを処理する際、1秒/10秒とwaitを入れるようにしてみました。
・receive_normal.py
- import pika, sys, os, time
- def main():
- # ユーザー名とパスワード
- credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
- # 接続パラメーター作成
- connect_param = pika.ConnectionParameters(
- host='192.168.11.202',
- credentials=credentials)
- connection = pika.BlockingConnection(connect_param)
- channel = connection.channel()
- # durable=Trueを追加
- channel.queue_declare(queue='hello', durable=True)
- def callback(ch, method, properties, body):
- print(f" [x] Normal Received {body}")
- # 1秒待ち
- time.sleep(1)
- ch.basic_ack(delivery_tag = method.delivery_tag)
- #channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
- channel.basic_consume(queue='hello', on_message_callback=callback)
- print(' [*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- print('Interrupted')
- try:
- sys.exit(0)
- except SystemExit:
- os._exit(0)
・receive_slow.py
- import pika, sys, os, time
- def main():
- # ユーザー名とパスワード
- credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
- # 接続パラメーター作成
- connect_param = pika.ConnectionParameters(
- host='192.168.11.202',
- credentials=credentials)
- connection = pika.BlockingConnection(connect_param)
- channel = connection.channel()
- # durable=Trueを追加
- channel.queue_declare(queue='hello', durable=True)
- def callback(ch, method, properties, body):
- print(f" [x] Slow Received {body}")
- # 10秒待ち
- time.sleep(10)
- ch.basic_ack(delivery_tag = method.delivery_tag)
- #channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
- channel.basic_consume(queue='hello', on_message_callback=callback)
- print(' [*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- print('Interrupted')
- try:
- sys.exit(0)
- except SystemExit:
- os._exit(0)
送信側では10個のメッセージを送信します。
・send.py
- import pika
- # ユーザー名とパスワード
- credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
- # 接続パラメーター作成
- connect_param = pika.ConnectionParameters(
- host='192.168.11.202',
- credentials=credentials)
- # コネクション作成
- connection = pika.BlockingConnection(connect_param)
- channel = connection.channel()
- # durable=Trueで永続化してい
- channel.queue_declare(queue='hello', durable=True)
- for i in range(10):
- channel.basic_publish(exchange='',
- routing_key='hello',
- body='Hello World!',
- properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent))
- print("[x] Sent 'Hello World!")
receive_normal.py と receive_slow.py を実行してメッセージを処理してみます。
$ python receive_normal.py
[*] Waiting for messages. To exit press CTRL+C
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
$ python receive_slow.py
[*] Waiting for messages. To exit press CTRL+C
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'
receive_normal.py は5つのメッセージの処理が完了しているのに
新規のキューは割り当てられません。
均等に処理が振り分けられました。
receive_slow.py の処理中、Ctrl + Cで終了すると残りのキューは
receive_normal.py へ割り当てられました。
$ python receive_normal.py
[*] Waiting for messages. To exit press CTRL+C
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
$ python receive_slow.py
[*] Waiting for messages. To exit press CTRL+C
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'
^CInterrupted
basic_qos
basic_qosでprefetch_countを指定すれば良いとのこと。
basic_qos
pikaではデフォルトでまとめてキューを受け取るようですね。
1回のフェッチサイズを1にすることで、都度キューの内容を確認してくれるようです。
この指定が必要なのは受信側だけです。
receive_normal.py と receive_slow.pyにbasic_qosの指定を追加します。
・receive_normal.py
- import pika, sys, os, time
- def main():
- # ユーザー名とパスワード
- credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
- # 接続パラメーター作成
- connect_param = pika.ConnectionParameters(
- host='192.168.11.202',
- credentials=credentials)
- connection = pika.BlockingConnection(connect_param)
- channel = connection.channel()
- # durable=Trueを追加
- channel.queue_declare(queue='hello', durable=True)
- def callback(ch, method, properties, body):
- print(f" [x] Normal Received {body}")
- # 1秒待ち
- time.sleep(1)
- ch.basic_ack(delivery_tag = method.delivery_tag)
- # # prefetch_countを指定
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(queue='hello', on_message_callback=callback)
- print(' [*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- print('Interrupted')
- try:
- sys.exit(0)
- except SystemExit:
- os._exit(0)
・receive_slow.py
- import pika, sys, os, time
- def main():
- # ユーザー名とパスワード
- credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
- # 接続パラメーター作成
- connect_param = pika.ConnectionParameters(
- host='192.168.11.202',
- credentials=credentials)
- connection = pika.BlockingConnection(connect_param)
- channel = connection.channel()
- # durable=Trueを追加
- channel.queue_declare(queue='hello', durable=True)
- def callback(ch, method, properties, body):
- print(f" [x] Slow Received {body}")
- # 10秒待ち
- time.sleep(10)
- ch.basic_ack(delivery_tag = method.delivery_tag)
- # prefetch_countを指定
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(queue='hello', on_message_callback=callback)
- print(' [*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- print('Interrupted')
- try:
- sys.exit(0)
- except SystemExit:
- os._exit(0)
これで時間的に均等に処理が割り当てられました。
$ python receive_normal.py
[*] Waiting for messages. To exit press CTRL+C
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
$ python receive_slow.py
[*] Waiting for messages. To exit press CTRL+C
[x] Slow Received b'Hello World!'
画像の縮小処理など、処理対象により実行時間が異なる処理を分散している場合には
必要な設定なんじゃないかと思います。