RabbitMQ 4 + pika メッセージの永続化

RabbitMQ、そういえばサーバーを再移動するをキューが消えるんでした。
RabbitMQでキューの内容を永続化する

pikaだと永続化の指定はどうなるのか。


durableとdelivery_mode


こちらを参考にさせていただきました。
RabbitMQでのメッセージ消失の対策


queue_declareでdurable=Trueを指定。
basic_publishでpropertiesにpika.DeliveryMode.Persistentを指定すればOKでした。

なお、pika.DeliveryModeは
pika.DeliveryMode.Transient
pika.DeliveryMode.Persistent
の2つで、デフォルトがTransient(永続化なし)のようです

また、durable=Trueの指定は受信側にも必要でした。

・send.py

  1. import pika
  2. # ユーザー名とパスワード
  3. credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  4. # 接続パラメーター作成
  5. connect_param = pika.ConnectionParameters(
  6.                     host='192.168.11.202',
  7.                     credentials=credentials)
  8. # コネクション作成
  9. connection = pika.BlockingConnection(connect_param)
  10. channel = connection.channel()
  11. # durable=Trueで永続化してい
  12. channel.queue_declare(queue='hello', durable=True)
  13. #
  14. channel.basic_publish(exchange='',
  15.                      routing_key='hello',
  16.                      body='Hello World!',
  17.                      properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent))
  18. print("[x] Sent 'Hello World!")



・recive.py

  1. import pika, sys, os
  2. def main():
  3.     # ユーザー名とパスワード
  4.     credentials= pika.PlainCredentials('symfo', 'P@ssw0rd')
  5.     # 接続パラメーター作成
  6.     connect_param = pika.ConnectionParameters(
  7.                         host='192.168.11.202',
  8.                         credentials=credentials)
  9.     connection = pika.BlockingConnection(connect_param)
  10.     channel = connection.channel()
  11.     # durable=Trueを追加
  12.     channel.queue_declare(queue='hello',  durable=True)
  13.     def callback(ch, method, properties, body):
  14.         print(f" [x] Received {body}")
  15.     channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
  16.     print(' [*] Waiting for messages. To exit press CTRL+C')
  17.     channel.start_consuming()
  18. if __name__ == '__main__':
  19.     try:
  20.         main()
  21.     except KeyboardInterrupt:
  22.         print('Interrupted')
  23.         try:
  24.             sys.exit(0)
  25.         except SystemExit:
  26.             os._exit(0)




これでサーバーを再起動してもキューの内容が保持されていました。


関連記事

コメント

プロフィール

Author:symfo
blog形式だと探しにくいので、まとめサイト作成中です。
https://symfo.web.fc2.com/

PR

検索フォーム

月別アーカイブ