Kestrelのfanoutを使ってみる

Advanced Message Queuing Protocol (AMQP) モデルでは Producer -> Exchange -> Binding(topic exchange の場合のみ) -> Queue とメッセージは流れる。ここで中心的な役割を果たす Exchange は主に fanout/direct/topic の3パターンがある。このうち Kestrel もサポートしている fanout を触ってみた。

AMQP の3つの Exchange

1. Fanout Exchange
fan-out exchange ではメッセージは exchange とつながっている各キューに配信される。

2. Direct Exchange
direct exchange ではメッセージの routing key に応じて配信先キューが振り分けられる。

3. Topic Exchange
topic exchange ではメッセージの routing key とブローカーの binding key に応じて配信先キューが振り分けられる。

# Exchange 画像は RedHat の Enterprise MRG のマニュアルから

Kestrel での fanout

kestrel ではキュー名の命名規則により、fanout として扱えるようにしている。

If a queue name has a + in it (like “orders+audit”), it’s treated as a fanout queue, using the format <parent>+<child>. These queues belong to a parent queue — in this example, the “orders” queue. Every item written into a parent queue will also be written into each of its children.

https://github.com/robey/kestrel/blob/master/docs/guide.md

fanout の使用例

RabbitMQ のサイトの pub/sub のチュートリアルを Kestrel に移植してみる。


Producer
プロデューサーはキュー名 “logs” に対してメッセージを送信する。プロデューサーは fanout を一切意識しない。

# emit_logs.py
# http://www.rabbitmq.com/tutorials/tutorial-three-python.html
import sys
import kestrel
host='127.0.0.1'
port=22133

client = kestrel.Client(servers=['%s:%s'%(host, port)])
message = ' '.join(sys.argv[1:]) or "info: Hello World!"

client.add('logs', message)
# client.add('logs+alice', message) # direct exchange

Consumer
コンシューマーはキュー名”logs”のメッセージを受信。プログラム起動時の引数で、fanout 用の child name を指定するようにしている。

# receive_logs.py
# http://www.rabbitmq.com/tutorials/tutorial-three-python.html
import sys
import kestrel
host='127.0.0.1'
port=22133
client = kestrel.Client(servers=['%s:%s'%(host, port)])

queue_name = 'logs'
if len(sys.argv) > 1:
    queue_name += '+%s' % sys.argv[1] # fanout queue

while True:
    try:
        body = client.next(queue_name, timeout=10)
        if body is not None:
            print " [%s] %r" % (queue_name, body) # キュー名全体[parent+child]を確認用に出力
    except KeyboardInterrupt, err:
         client.abort(queue_name)
         sys.exit(0)

プログラムの実行
複数のターミナルを起動して実行させる。
まずは consumer を複数起動。

# terminal #1
$ python receive_logs.py
 [logs] 'info: Hello World!'
 [logs] 'test'

# terminal #2
$ python receive_logs.py alice
 [logs+alice] 'info: Hello World!'
 [logs+alice] 'test'

# terminal #3
$ python receive_logs.py bob
 [logs+bob] 'info: Hello World!'
 [logs+bob] 'test'

あとは producer がメッセージを送信すれば consumer に届く。

$ python emit_logs.py
$ python emit_logs.py test

ファンアウトキューの特性

  • 子供のキュー名を直接指定して、キュー操作を直接行うことも可能。
  • 設定ファイルは親のものを利用。(queue_name=”foo+bar”のfanoutキューの場合、queue_name=”foo”のネームスペースで定義された設定が利用される)
  • 親とは独立したジャーナルファイルを持つ(デフォルトは /var/spool/kestrel/ 以下)

fanout に関連する設定項目
kestrel の設定ファイル(production.scala/development.scala)で fanout 用キューの QueueBuilder に “fanoutOnly = true” の設定を追加すると、子供にしかメッセージは配信されなくなる。上の例では キュー名に “+” が含まれる “logs+alice”, “logs+bob” には配信されるが、 “logs” には配信されない。この機能を利用したうまいユースケースは思い浮かばない、、、

  queues = new QueueBuilder {
    name = "logs"
    fanoutOnly = true
  }

RabbitMQ in Action での fanout の使い
“RabbitMQ in Action” では exchange はCh.2 Understanding messaging で、fanout の実例は、ユーザの画像投稿を例に §4.2.2 Parallel processing で解説されている。

Leave a comment

  • Design a site like this with WordPress.com
    Get started