RabbitMQ 4 + pika エラー発生時はキューに残す
RabbitMQ 4 + pikaを触ってみています。ワーカー側でキューを処理中にエラーになったらどうなるか。
エラー発生
処理中にraise Exceptionしてみます。
- import pika, sys, os
- def main():
- # ユーザー名とパスワード
- credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
- # 接続パラメーター作成
- connect_param = pika.ConnectionParameters(
- host='192.168.11.202',
- credentials=credentials)
- connection = pika.BlockingConnection(connect_param)
- channel = connection.channel()
- channel.queue_declare(queue='hello', durable=True)
- def callback(ch, method, properties, body):
- print(f" [x] Received {body}")
- # エラー発生
- raise Exception('callback error.')
- channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
- print(' [*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- print('Interrupted')
- try:
- sys.exit(0)
- except SystemExit:
- os._exit(0)
試してみると、エラーが発生してもキューの内容は削除されました。
auto_ackとbasic_ack
薄々auto_ackを指定しているから、自動的に処理済になるんだろうなと。
auto_ackを使用せず、basic_ackで自前で応答するよう変更します。
- import pika, sys, os
- def main():
- # ユーザー名とパスワード
- credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
- # 接続パラメーター作成
- connect_param = pika.ConnectionParameters(
- host='192.168.11.202',
- credentials=credentials)
- connection = pika.BlockingConnection(connect_param)
- channel = connection.channel()
- # durable=Trueを追加
- channel.queue_declare(queue='hello', durable=True)
- def callback(ch, method, properties, body):
- print(f" [x] Received {body}")
- #raise Exception('callback error.')
- ch.basic_ack(delivery_tag = method.delivery_tag)
- #channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
- channel.basic_consume(queue='hello', on_message_callback=callback)
- print(' [*] Waiting for messages. To exit press CTRL+C')
- channel.start_consuming()
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- print('Interrupted')
- try:
- sys.exit(0)
- except SystemExit:
- os._exit(0)
basic_ackを送信しない限り処理済とならないよう変更できました。
- 関連記事
コメント