プログラミング ミドルウェア

RabbitMQチュートリアル Part3

更新日:

要約

Publish/Subsribe

複数のコンシューマに対してメッセージを配信する方法を説明する。
このパターンは一般的に Publish/Subsribe と呼ばれている。

Publishされたメッセージを全てのConsumerへブロードキャストする方法を説明する。

Exchanges

  • Producer ・・・ メッセージを送信するアプリケーション(直接キューにメッセージをPublishしない)
  • Queue ・・・ メッセージを格納する箱
  • Consumer ・・・ アプリケーションから送信されたメッセージを受信する(Queueからメッセージ取得する)
  • Exchange ・・・ Producerからのメッセージを受信しルールに従いQueueに格納する。
  • Bindings ・・・ ExchangeとQueueの関連を定義

ProducerはExchangeに対してメッセージをPublishする。
Exchangeのタイプは複数ある(direct/topic/headers/fanoutなど)。
fanoutはBindingされている複数のキューにメッセージをブロードキャストする。

Exchangeの一覧

Exchange一覧の表示

sudo rabbitmqctl list_exchanges

デフォルトでは amq.xxx(amq.fanoutなど)(名前なし)direct Exchangeが存在している。

The default exchange

下記はデフォルトのExchangeを使用して'hello'というメッセージを'hello'というキューにPublishする。

channel.default_exchange.publish('hello', routing_key: 'hello')

デフォルトのExcange使用せず、Exchangeに名前をつけてpublishする場合は

exchange = channel.fanout('logs')
exchange.publish(message)

とする

Temporary queues

一時的なキューを作成できる。
Consumerがキューに接続する時に空の名前('')で接続するとサーバがランダムな名前(amq.gen.xxx)のキューを自動で作成する。

queue = channel.queue('', exclusive: true)

また、exclusive: trueにすると排他的なキューとなり他のConsumerは接続できない、またコネクション切断するとキューを自動で削除される。
クライアント固有の一時的なキューとして利用する。

Bindings

ExchangeとQueueをBindingすることでそのQueueにメッセージが配送できる。

queue.bind('logs')

Bindings一覧を表示

rabbitmqctl list_bindings

やってるみる

複数のコンシューマで起動する

bundle exec ruby receive_logs.rb

この状態でexchange、bindings確認する

[root@rabbitmq1 rabbitmq]# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.headers headers
amq.rabbitmq.trace  topic
amq.match   headers
amq.topic   topic
amq.direct  direct
amq.fanout  fanout
    direct
logs    fanout
[root@rabbitmq1 rabbitmq]# 
[root@rabbitmq1 rabbitmq]# rabbitmqctl list_bindings
Listing bindings for vhost /...
    exchange    amq.gen-8F4CKb-sgjOFawbXfM-nWg  queue   amq.gen-8F4CKb-sgjOFawbXfM-nWg  []
    exchange    amq.gen-D1bL9vVau8mpVubEXwR9pA  queue   amq.gen-D1bL9vVau8mpVubEXwR9pA  []
logs    exchange    amq.gen-8F4CKb-sgjOFawbXfM-nWg  queue       []
logs    exchange    amq.gen-D1bL9vVau8mpVubEXwR9pA  queue       []
[root@rabbitmq1 rabbitmq]# 

logsという名前のExchangeがあって、amq.gen-8F...amq.gen-D1...にbindingされてる

publishしてみる

bundle exec ruby emit_log.rb 

起動してるコンシューマにメッセージがとどく

[vagrant@rabbitmq1 bunny]$ bundle exec ruby receive_logs.rb    
 [*] Waiting for logs. To exit press CTRL+C                    
 [x] Hello World!                                              

-プログラミング, ミドルウェア
-,

Copyright© 明日から頑張ります。 , 2025 All Rights Reserved Powered by STINGER.