RabbitMQ 4 + pika 複数ワーカーで処理で処理待ちワーカーへの割当

RabbitMQ 4 + pikaの動作を確認しています。
basic_ackで処理完了を通知するところまで見てみました。
RabbitMQ 4 + pika エラー発生時はキューに残す

こちらを参考にさせて頂いているのですが
RabbitMQでのメッセージ消失の対策
1つのキューに対して複数のワーカーで待ち受けるパターンの場合、均等に分散されるとのこと。
早速試してみます。


サンプル


メッセージを処理する際、1秒/10秒とwaitを入れるようにしてみました。

・receive_normal.py

  1. import pika, sys, os, time
  2. def main():
  3.     # ユーザー名とパスワード
  4.     credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  5.     # 接続パラメーター作成
  6.     connect_param = pika.ConnectionParameters(
  7.                         host='192.168.11.202',
  8.                         credentials=credentials)
  9.     connection = pika.BlockingConnection(connect_param)
  10.     channel = connection.channel()
  11.     # durable=Trueを追加
  12.     channel.queue_declare(queue='hello',  durable=True)
  13.     def callback(ch, method, properties, body):
  14.         print(f" [x] Normal Received {body}")
  15.         # 1秒待ち
  16.         time.sleep(1)
  17.         ch.basic_ack(delivery_tag = method.delivery_tag)
  18.     #channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
  19.     channel.basic_consume(queue='hello', on_message_callback=callback)
  20.     print(' [*] Waiting for messages. To exit press CTRL+C')
  21.     channel.start_consuming()
  22. if __name__ == '__main__':
  23.     try:
  24.         main()
  25.     except KeyboardInterrupt:
  26.         print('Interrupted')
  27.         try:
  28.             sys.exit(0)
  29.         except SystemExit:
  30.             os._exit(0)




・receive_slow.py

  1. import pika, sys, os, time
  2. def main():
  3.     # ユーザー名とパスワード
  4.     credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  5.     # 接続パラメーター作成
  6.     connect_param = pika.ConnectionParameters(
  7.                         host='192.168.11.202',
  8.                         credentials=credentials)
  9.     connection = pika.BlockingConnection(connect_param)
  10.     channel = connection.channel()
  11.     # durable=Trueを追加
  12.     channel.queue_declare(queue='hello',  durable=True)
  13.     def callback(ch, method, properties, body):
  14.         print(f" [x] Slow Received {body}")
  15.         # 10秒待ち
  16.         time.sleep(10)
  17.         ch.basic_ack(delivery_tag = method.delivery_tag)
  18.     #channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
  19.     channel.basic_consume(queue='hello', on_message_callback=callback)
  20.     print(' [*] Waiting for messages. To exit press CTRL+C')
  21.     channel.start_consuming()
  22. if __name__ == '__main__':
  23.     try:
  24.         main()
  25.     except KeyboardInterrupt:
  26.         print('Interrupted')
  27.         try:
  28.             sys.exit(0)
  29.         except SystemExit:
  30.             os._exit(0)




送信側では10個のメッセージを送信します。
・send.py

  1. import pika
  2. # ユーザー名とパスワード
  3. credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  4. # 接続パラメーター作成
  5. connect_param = pika.ConnectionParameters(
  6.                     host='192.168.11.202',
  7.                     credentials=credentials)
  8. # コネクション作成
  9. connection = pika.BlockingConnection(connect_param)
  10. channel = connection.channel()
  11. # durable=Trueで永続化してい
  12. channel.queue_declare(queue='hello', durable=True)
  13. for i in range(10):
  14.     channel.basic_publish(exchange='',
  15.                         routing_key='hello',
  16.                         body='Hello World!',
  17.                         properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent))
  18.     print("[x] Sent 'Hello World!")



receive_normal.py と receive_slow.py を実行してメッセージを処理してみます。

$ python receive_normal.py
[*] Waiting for messages. To exit press CTRL+C
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'


$ python receive_slow.py
[*] Waiting for messages. To exit press CTRL+C
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'




receive_normal.py は5つのメッセージの処理が完了しているのに
新規のキューは割り当てられません。
均等に処理が振り分けられました。

receive_slow.py の処理中、Ctrl + Cで終了すると残りのキューは
receive_normal.py へ割り当てられました。

$ python receive_normal.py
[*] Waiting for messages. To exit press CTRL+C
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'


$ python receive_slow.py
[*] Waiting for messages. To exit press CTRL+C
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'
[x] Slow Received b'Hello World!'
^CInterrupted





basic_qos


basic_qosでprefetch_countを指定すれば良いとのこと。
basic_qos
pikaではデフォルトでまとめてキューを受け取るようですね。
1回のフェッチサイズを1にすることで、都度キューの内容を確認してくれるようです。

この指定が必要なのは受信側だけです。
receive_normal.py と receive_slow.pyにbasic_qosの指定を追加します。

・receive_normal.py

  1. import pika, sys, os, time
  2. def main():
  3.     # ユーザー名とパスワード
  4.     credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  5.     # 接続パラメーター作成
  6.     connect_param = pika.ConnectionParameters(
  7.                         host='192.168.11.202',
  8.                         credentials=credentials)
  9.     connection = pika.BlockingConnection(connect_param)
  10.     channel = connection.channel()
  11.     # durable=Trueを追加
  12.     channel.queue_declare(queue='hello',  durable=True)
  13.     def callback(ch, method, properties, body):
  14.         print(f" [x] Normal Received {body}")
  15.         # 1秒待ち
  16.         time.sleep(1)
  17.         ch.basic_ack(delivery_tag = method.delivery_tag)
  18.     # # prefetch_countを指定
  19.     channel.basic_qos(prefetch_count=1)
  20.     channel.basic_consume(queue='hello', on_message_callback=callback)
  21.     print(' [*] Waiting for messages. To exit press CTRL+C')
  22.     channel.start_consuming()
  23. if __name__ == '__main__':
  24.     try:
  25.         main()
  26.     except KeyboardInterrupt:
  27.         print('Interrupted')
  28.         try:
  29.             sys.exit(0)
  30.         except SystemExit:
  31.             os._exit(0)



・receive_slow.py

  1. import pika, sys, os, time
  2. def main():
  3.     # ユーザー名とパスワード
  4.     credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  5.     # 接続パラメーター作成
  6.     connect_param = pika.ConnectionParameters(
  7.                         host='192.168.11.202',
  8.                         credentials=credentials)
  9.     connection = pika.BlockingConnection(connect_param)
  10.     channel = connection.channel()
  11.     # durable=Trueを追加
  12.     channel.queue_declare(queue='hello',  durable=True)
  13.     def callback(ch, method, properties, body):
  14.         print(f" [x] Slow Received {body}")
  15.         # 10秒待ち
  16.         time.sleep(10)
  17.         ch.basic_ack(delivery_tag = method.delivery_tag)
  18.     # prefetch_countを指定
  19.     channel.basic_qos(prefetch_count=1)
  20.     channel.basic_consume(queue='hello', on_message_callback=callback)
  21.     print(' [*] Waiting for messages. To exit press CTRL+C')
  22.     channel.start_consuming()
  23. if __name__ == '__main__':
  24.     try:
  25.         main()
  26.     except KeyboardInterrupt:
  27.         print('Interrupted')
  28.         try:
  29.             sys.exit(0)
  30.         except SystemExit:
  31.             os._exit(0)



これで時間的に均等に処理が割り当てられました。

$ python receive_normal.py
[*] Waiting for messages. To exit press CTRL+C
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'
[x] Normal Received b'Hello World!'


$ python receive_slow.py
[*] Waiting for messages. To exit press CTRL+C
[x] Slow Received b'Hello World!'




画像の縮小処理など、処理対象により実行時間が異なる処理を分散している場合には
必要な設定なんじゃないかと思います。

RabbitMQ 4 + pika エラー発生時はキューに残す

RabbitMQ 4 + pikaを触ってみています。
ワーカー側でキューを処理中にエラーになったらどうなるか。


エラー発生


処理中にraise Exceptionしてみます。

  1. import pika, sys, os
  2. def main():
  3.     # ユーザー名とパスワード
  4.     credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  5.     # 接続パラメーター作成
  6.     connect_param = pika.ConnectionParameters(
  7.                         host='192.168.11.202',
  8.                         credentials=credentials)
  9.     connection = pika.BlockingConnection(connect_param)
  10.     channel = connection.channel()
  11.     channel.queue_declare(queue='hello',  durable=True)
  12.     def callback(ch, method, properties, body):
  13.         print(f" [x] Received {body}")
  14.         # エラー発生
  15.         raise Exception('callback error.')
  16.     channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
  17.     print(' [*] Waiting for messages. To exit press CTRL+C')
  18.     channel.start_consuming()
  19. if __name__ == '__main__':
  20.     try:
  21.         main()
  22.     except KeyboardInterrupt:
  23.         print('Interrupted')
  24.         try:
  25.             sys.exit(0)
  26.         except SystemExit:
  27.             os._exit(0)



試してみると、エラーが発生してもキューの内容は削除されました。



auto_ackとbasic_ack


薄々auto_ackを指定しているから、自動的に処理済になるんだろうなと。
auto_ackを使用せず、basic_ackで自前で応答するよう変更します。

  1. import pika, sys, os
  2. def main():
  3.     # ユーザー名とパスワード
  4.     credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  5.     # 接続パラメーター作成
  6.     connect_param = pika.ConnectionParameters(
  7.                         host='192.168.11.202',
  8.                         credentials=credentials)
  9.     connection = pika.BlockingConnection(connect_param)
  10.     channel = connection.channel()
  11.     # durable=Trueを追加
  12.     channel.queue_declare(queue='hello',  durable=True)
  13.     def callback(ch, method, properties, body):
  14.         print(f" [x] Received {body}")
  15.         #raise Exception('callback error.')
  16.         ch.basic_ack(delivery_tag = method.delivery_tag)
  17.     #channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
  18.     channel.basic_consume(queue='hello', on_message_callback=callback)
  19.     print(' [*] Waiting for messages. To exit press CTRL+C')
  20.     channel.start_consuming()
  21. if __name__ == '__main__':
  22.     try:
  23.         main()
  24.     except KeyboardInterrupt:
  25.         print('Interrupted')
  26.         try:
  27.             sys.exit(0)
  28.         except SystemExit:
  29.             os._exit(0)



basic_ackを送信しない限り処理済とならないよう変更できました。

RabbitMQ 4 + pika メッセージの永続化

RabbitMQ、そういえばサーバーを再移動するをキューが消えるんでした。
RabbitMQでキューの内容を永続化する

pikaだと永続化の指定はどうなるのか。


durableとdelivery_mode


こちらを参考にさせていただきました。
RabbitMQでのメッセージ消失の対策


queue_declareでdurable=Trueを指定。
basic_publishでpropertiesにpika.DeliveryMode.Persistentを指定すればOKでした。

なお、pika.DeliveryModeは
pika.DeliveryMode.Transient
pika.DeliveryMode.Persistent
の2つで、デフォルトがTransient(永続化なし)のようです

また、durable=Trueの指定は受信側にも必要でした。

・send.py

  1. import pika
  2. # ユーザー名とパスワード
  3. credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  4. # 接続パラメーター作成
  5. connect_param = pika.ConnectionParameters(
  6.                     host='192.168.11.202',
  7.                     credentials=credentials)
  8. # コネクション作成
  9. connection = pika.BlockingConnection(connect_param)
  10. channel = connection.channel()
  11. # durable=Trueで永続化してい
  12. channel.queue_declare(queue='hello', durable=True)
  13. #
  14. channel.basic_publish(exchange='',
  15.                      routing_key='hello',
  16.                      body='Hello World!',
  17.                      properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent))
  18. print("[x] Sent 'Hello World!")



・recive.py

  1. import pika, sys, os
  2. def main():
  3.     # ユーザー名とパスワード
  4.     credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  5.     # 接続パラメーター作成
  6.     connect_param = pika.ConnectionParameters(
  7.                         host='192.168.11.202',
  8.                         credentials=credentials)
  9.     connection = pika.BlockingConnection(connect_param)
  10.     channel = connection.channel()
  11.     # durable=Trueを追加
  12.     channel.queue_declare(queue='hello',  durable=True)
  13.     def callback(ch, method, properties, body):
  14.         print(f" [x] Received {body}")
  15.     channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
  16.     print(' [*] Waiting for messages. To exit press CTRL+C')
  17.     channel.start_consuming()
  18. if __name__ == '__main__':
  19.     try:
  20.         main()
  21.     except KeyboardInterrupt:
  22.         print('Interrupted')
  23.         try:
  24.             sys.exit(0)
  25.         except SystemExit:
  26.             os._exit(0)




これでサーバーを再起動してもキューの内容が保持されていました。


RabbitMQ 4 にPythonクライアント(pika)からメッセージを送信、受信する

以前も試したことがありますが、各種バージョンが上がっているので再度試してみます。
RabbitMQにPythonクライアント(pika)からメッセージを送信、受信する



pikaのインストール


接続用のライブラリpikaをインストールします。

$ pip install pika



aptでインストールする場合

$ sudo apt install python3-pika



pipでは1.3.2、aptでは1.2.0がインストールされました。



送信サンプル


送信のサンプルです。

・send.py

  1. import pika
  2. # ユーザー名とパスワード
  3. credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  4. # 接続パラメーター作成
  5. connect_param = pika.ConnectionParameters(
  6.                     host='192.168.11.202',
  7.                     credentials=credentials)
  8. # コネクション作成
  9. connection = pika.BlockingConnection(connect_param)
  10. channel = connection.channel()
  11. channel.queue_declare(queue='hello')
  12. channel.basic_publish(exchange='',
  13.                      routing_key='hello',
  14.                      body='Hello World!')
  15. print("[x] Sent 'Hello World!")
  16. connection.close()



ローカルホストアクセスの場合は必要ありませんが、
リモートサーバーに接続する場合はユーザーとパスワードの指定が必要です。

こちらで作成したユーザーを指定しました。
RabbitMQ 4 Web Interfaceの有効化とログインユーザーの作成


管理画面を見ると、キューに1つ追加されたことがわかります。
c54_01.png


コマンドでも現在のキューの内容を確認できます。

$ sudo rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 1





受信のサンプル


受信のサンプルはこうなりました。

・receive.py

  1. import pika, sys, os
  2. def main():
  3.     # ユーザー名とパスワード
  4.     credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  5.     # 接続パラメーター作成
  6.     connect_param = pika.ConnectionParameters(
  7.                         host='192.168.11.202',
  8.                         credentials=credentials)
  9.     connection = pika.BlockingConnection(connect_param)
  10.     channel = connection.channel()
  11.     channel.queue_declare(queue='hello')
  12.     def callback(ch, method, properties, body):
  13.         print(f" [x] Received {body}")
  14.     channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
  15.     print(' [*] Waiting for messages. To exit press CTRL+C')
  16.     channel.start_consuming()
  17. if __name__ == '__main__':
  18.     try:
  19.         main()
  20.     except KeyboardInterrupt:
  21.         print('Interrupted')
  22.         try:
  23.             sys.exit(0)
  24.         except SystemExit:
  25.             os._exit(0)




実行するとメッセージの待受状態になります。
send.pyを実行すると、メッセージが表示されます。

$ python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World!'



RabbitMQ 4 Web Interfaceの有効化とログインユーザーの作成

RabbitMQをUbuntu 24.04にインストールしました。
RabbitMQ 4 をUbuntu Server 24.04へインストール

以前調べたときはwebインターフェースがありました。
RabbitMQ Web Interfaceの有効化とログインユーザー

現在も使用できるのか調べてみます。


マネージメントプラグインの有効化


Management Plugin
こちらに管理用プラグインについて解説があります。

ひとまず、webインターフェースを有効化してみます。

$ rabbitmq-plugins enable rabbitmq_management
...略
Only root or rabbitmq can run rabbitmq-plugins



rootで実行する必要があるようです。
sudoを付けて実行。

$ sudo rabbitmq-plugins enable rabbitmq_management

Enabling plugins on node rabbit@rabbit:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@rabbit...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

started 3 plugins.



ポート15672でwebサーバーが起動します。
http://[サーバーIP]:15672/にブラウザでアクセスするとログイン画面が表示されました。

c53_01.png



ユーザー名とパスワード


管理画面にログインするデフォルトパスワードは用意されていないようです。
コマンドでログインユーザーを作成し、パーミッションを設定します。

$ sudo rabbitmqctl add_user symfo P@ssw0rd
$ sudo rabbitmqctl set_user_tags symfo administrator
$ sudo rabbitmqctl set_permissions -p / symfo ".*" ".*" ".*"



設定したIDとパスワードを入力すると、管理画面にログインできました。

c53_02.png


プロフィール

Author:symfo
blog形式だと探しにくいので、まとめサイト作成中です。
https://symfo.web.fc2.com/

PR

検索フォーム

月別アーカイブ