RabbitMQ 4 + pika エラー発生時はキューに残す

RabbitMQ 4 + pikaを触ってみています。
ワーカー側でキューを処理中にエラーになったらどうなるか。


エラー発生


処理中にraise Exceptionしてみます。

  1. import pika, sys, os
  2. def main():
  3.     # ユーザー名とパスワード
  4.     credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  5.     # 接続パラメーター作成
  6.     connect_param = pika.ConnectionParameters(
  7.                         host='192.168.11.202',
  8.                         credentials=credentials)
  9.     connection = pika.BlockingConnection(connect_param)
  10.     channel = connection.channel()
  11.     channel.queue_declare(queue='hello',  durable=True)
  12.     def callback(ch, method, properties, body):
  13.         print(f" [x] Received {body}")
  14.         # エラー発生
  15.         raise Exception('callback error.')
  16.     channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
  17.     print(' [*] Waiting for messages. To exit press CTRL+C')
  18.     channel.start_consuming()
  19. if __name__ == '__main__':
  20.     try:
  21.         main()
  22.     except KeyboardInterrupt:
  23.         print('Interrupted')
  24.         try:
  25.             sys.exit(0)
  26.         except SystemExit:
  27.             os._exit(0)



試してみると、エラーが発生してもキューの内容は削除されました。



auto_ackとbasic_ack


薄々auto_ackを指定しているから、自動的に処理済になるんだろうなと。
auto_ackを使用せず、basic_ackで自前で応答するよう変更します。

  1. import pika, sys, os
  2. def main():
  3.     # ユーザー名とパスワード
  4.     credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  5.     # 接続パラメーター作成
  6.     connect_param = pika.ConnectionParameters(
  7.                         host='192.168.11.202',
  8.                         credentials=credentials)
  9.     connection = pika.BlockingConnection(connect_param)
  10.     channel = connection.channel()
  11.     # durable=Trueを追加
  12.     channel.queue_declare(queue='hello',  durable=True)
  13.     def callback(ch, method, properties, body):
  14.         print(f" [x] Received {body}")
  15.         #raise Exception('callback error.')
  16.         ch.basic_ack(delivery_tag = method.delivery_tag)
  17.     #channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
  18.     channel.basic_consume(queue='hello', on_message_callback=callback)
  19.     print(' [*] Waiting for messages. To exit press CTRL+C')
  20.     channel.start_consuming()
  21. if __name__ == '__main__':
  22.     try:
  23.         main()
  24.     except KeyboardInterrupt:
  25.         print('Interrupted')
  26.         try:
  27.             sys.exit(0)
  28.         except SystemExit:
  29.             os._exit(0)



basic_ackを送信しない限り処理済とならないよう変更できました。
関連記事

コメント

プロフィール

Author:symfo
blog形式だと探しにくいので、まとめサイト作成中です。
https://symfo.web.fc2.com/

PR

検索フォーム

月別アーカイブ