Advanced Message Queuing Protocol (AMQP) モデルでは Producer -> Exchange -> Binding(topic exchange の場合のみ) -> Queue とメッセージは流れる。ここで中心的な役割を果たす Exchange は主に fanout/direct/topic の3パターンがある。このうち Kestrel もサポートしている fanout を触ってみた。
AMQP の3つの Exchange
1. Fanout Exchange
fan-out exchange ではメッセージは exchange とつながっている各キューに配信される。
2. Direct Exchange
direct exchange ではメッセージの routing key に応じて配信先キューが振り分けられる。
3. Topic Exchange
topic exchange ではメッセージの routing key とブローカーの binding key に応じて配信先キューが振り分けられる。
# Exchange 画像は RedHat の Enterprise MRG のマニュアルから
—
Kestrel での fanout
kestrel ではキュー名の命名規則により、fanout として扱えるようにしている。
If a queue name has a + in it (like “orders+audit”), it’s treated as a fanout queue, using the format <parent>+<child>. These queues belong to a parent queue — in this example, the “orders” queue. Every item written into a parent queue will also be written into each of its children.
fanout の使用例
RabbitMQ のサイトの pub/sub のチュートリアルを Kestrel に移植してみる。
Producer
プロデューサーはキュー名 “logs” に対してメッセージを送信する。プロデューサーは fanout を一切意識しない。
# emit_logs.py # http://www.rabbitmq.com/tutorials/tutorial-three-python.html import sys import kestrel host='127.0.0.1' port=22133 client = kestrel.Client(servers=['%s:%s'%(host, port)]) message = ' '.join(sys.argv[1:]) or "info: Hello World!" client.add('logs', message) # client.add('logs+alice', message) # direct exchange
Consumer
コンシューマーはキュー名”logs”のメッセージを受信。プログラム起動時の引数で、fanout 用の child name を指定するようにしている。
# receive_logs.py # http://www.rabbitmq.com/tutorials/tutorial-three-python.html import sys import kestrel host='127.0.0.1' port=22133 client = kestrel.Client(servers=['%s:%s'%(host, port)]) queue_name = 'logs' if len(sys.argv) > 1: queue_name += '+%s' % sys.argv[1] # fanout queue while True: try: body = client.next(queue_name, timeout=10) if body is not None: print " [%s] %r" % (queue_name, body) # キュー名全体[parent+child]を確認用に出力 except KeyboardInterrupt, err: client.abort(queue_name) sys.exit(0)
プログラムの実行
複数のターミナルを起動して実行させる。
まずは consumer を複数起動。
# terminal #1 $ python receive_logs.py [logs] 'info: Hello World!' [logs] 'test' # terminal #2 $ python receive_logs.py alice [logs+alice] 'info: Hello World!' [logs+alice] 'test' # terminal #3 $ python receive_logs.py bob [logs+bob] 'info: Hello World!' [logs+bob] 'test'
あとは producer がメッセージを送信すれば consumer に届く。
$ python emit_logs.py $ python emit_logs.py test
ファンアウトキューの特性
- 子供のキュー名を直接指定して、キュー操作を直接行うことも可能。
- 設定ファイルは親のものを利用。(queue_name=”foo+bar”のfanoutキューの場合、queue_name=”foo”のネームスペースで定義された設定が利用される)
- 親とは独立したジャーナルファイルを持つ(デフォルトは /var/spool/kestrel/ 以下)
fanout に関連する設定項目
kestrel の設定ファイル(production.scala/development.scala)で fanout 用キューの QueueBuilder に “fanoutOnly = true” の設定を追加すると、子供にしかメッセージは配信されなくなる。上の例では キュー名に “+” が含まれる “logs+alice”, “logs+bob” には配信されるが、 “logs” には配信されない。この機能を利用したうまいユースケースは思い浮かばない、、、
queues = new QueueBuilder { name = "logs" fanoutOnly = true }
RabbitMQ in Action での fanout の使い
“RabbitMQ in Action” では exchange はCh.2 Understanding messaging で、fanout の実例は、ユーザの画像投稿を例に §4.2.2 Parallel processing で解説されている。