要約
Publish/Subsribe
複数のコンシューマに対してメッセージを配信する方法を説明する。
このパターンは一般的に Publish/Subsribe
と呼ばれている。
Publishされたメッセージを全てのConsumerへブロードキャストする方法を説明する。
Exchanges
- Producer ・・・ メッセージを送信するアプリケーション(直接キューにメッセージをPublishしない)
- Queue ・・・ メッセージを格納する箱
- Consumer ・・・ アプリケーションから送信されたメッセージを受信する(Queueからメッセージ取得する)
- Exchange ・・・ Producerからのメッセージを受信しルールに従いQueueに格納する。
- Bindings ・・・ ExchangeとQueueの関連を定義
ProducerはExchangeに対してメッセージをPublishする。
Exchangeのタイプは複数ある(direct/topic/headers/fanoutなど)。
fanoutはBindingされている複数のキューにメッセージをブロードキャストする。
Exchangeの一覧
Exchange一覧の表示
sudo rabbitmqctl list_exchanges
デフォルトでは amq.xxx(amq.fanoutなど)
と (名前なし)direct
Exchangeが存在している。
The default exchange
下記はデフォルトのExchangeを使用して'hello'というメッセージを'hello'というキューにPublishする。
channel.default_exchange.publish('hello', routing_key: 'hello')
デフォルトのExcange使用せず、Exchangeに名前をつけてpublishする場合は
exchange = channel.fanout('logs')
exchange.publish(message)
とする
Temporary queues
一時的なキューを作成できる。
Consumerがキューに接続する時に空の名前('')で接続するとサーバがランダムな名前(amq.gen.xxx)のキューを自動で作成する。
queue = channel.queue('', exclusive: true)
また、exclusive: true
にすると排他的なキューとなり他のConsumerは接続できない、またコネクション切断するとキューを自動で削除される。
クライアント固有の一時的なキューとして利用する。
Bindings
ExchangeとQueueをBindingすることでそのQueueにメッセージが配送できる。
queue.bind('logs')
Bindings一覧を表示
rabbitmqctl list_bindings
やってるみる
複数のコンシューマで起動する
bundle exec ruby receive_logs.rb
この状態でexchange、bindings確認する
[root@rabbitmq1 rabbitmq]# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.headers headers
amq.rabbitmq.trace topic
amq.match headers
amq.topic topic
amq.direct direct
amq.fanout fanout
direct
logs fanout
[root@rabbitmq1 rabbitmq]#
[root@rabbitmq1 rabbitmq]# rabbitmqctl list_bindings
Listing bindings for vhost /...
exchange amq.gen-8F4CKb-sgjOFawbXfM-nWg queue amq.gen-8F4CKb-sgjOFawbXfM-nWg []
exchange amq.gen-D1bL9vVau8mpVubEXwR9pA queue amq.gen-D1bL9vVau8mpVubEXwR9pA []
logs exchange amq.gen-8F4CKb-sgjOFawbXfM-nWg queue []
logs exchange amq.gen-D1bL9vVau8mpVubEXwR9pA queue []
[root@rabbitmq1 rabbitmq]#
logsという名前のExchangeがあって、amq.gen-8F...
とamq.gen-D1...
にbindingされてる
publishしてみる
bundle exec ruby emit_log.rb
起動してるコンシューマにメッセージがとどく
[vagrant@rabbitmq1 bunny]$ bundle exec ruby receive_logs.rb
[*] Waiting for logs. To exit press CTRL+C
[x] Hello World!