要約
Round-robin dispatching
1つのキューに複数のコンシューマが接続している場合、デフォルトではRabbitMQは順番にメッセージをコンシューマに送信していく。
Message acknowledgment
コンシューマが何かしら処理の途中で死んだ時に、他のコンシューマへメッセージを再配送するための仕組み。
デフォルトでは、RabbitMQはメッセージ配送したらそのメッセージは即削除するが、その場合、コンシューマでメッセージ受信後、何かしら処理してる最中に障害発生した場合、別コンシューマで処理を再実行するということができない為、そのメッセージは消失してしまう。
コンシューマでメッセージ受信後にmanual_ack: true
にすることで、RabbitMQはメッセージ配送後、対象のメッセージをunackにする。
コンシューマがメッセージ受信後、ackを明示的にRabbitMQに返すことで、RabbitMQはそのメッセージを削除することができる。
queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
puts " [x] Received '#{body}'"
# imitate some work
sleep body.count('.').to_i
puts ' [x] Done'
channel.ack(delivery_info.delivery_tag)
end
RabbitMQはメッセージ配送後、対象のコンシューマの接続が切れた場合(チャンネルクローズした、コネクションクローズした、TCPコネクションが切れた)に、そのメッセージを別のコンシューマへ再配送して処理を再実行させることができる。
この仕組みにより、メッセージの消失はなくなり処理の再実行が可能となる。
なお、unackのメッセージはタイムアウトして別のコンシューマへ再配送されることはない。
RabbitMQは対象のコンシューマが死んだ時のみ、別のコンシューマへ再配送される。
unackはRabbitMQのメモリを解放しないなため、より多くのメモリを消費することになる。
下記のコマンドでunackメッセージ数を確認できる
rabbitmqctl list_queues name messages_ready messages_unacknowledged
Message durability
RabbitMQサーバが死んだ場合、デフォルトではキューもメッセージもロストしてしまう。
それを防ぐ為に、
- キューを
durable: true
で宣言する
channel.queue('task_queue', durable: true)
- メッセージをPublishする時に
persistent: true
としてPublishする。
exchange.publish(message, persistent: true)
を行う必要がある。
既存のキューの設定を変更することはできない。
なお、メッセージロストを完全に保証するものではない。
メッセージ受け付けて間もないタイミングでは、まだ保存されてない可能性もありうる。
RabbitMQは全てのメッセージに対してfsync(メモリー上にあるファイルの内容をストレージデバイス上のものと同期させること)を行わない。
単にバッファメモリに書かれてるだけで、ディスクにはまだ書き込まれていない場合もありうる。
Fair dispatch
prefetcメソッドを使うことでメッセージを配信する数を制御できる。
特定のworkerだけが常に忙しいという状況を解消できる。
下記の場合、workerは一つづつしかメッセージを受けれない。
(以前のメッセージを処理して確認するまでは新しいメッセージをそのWorkerに送信しない。)
n = 1;
channel.prefetch(n);
全てのWorkerが処理中の場合はキューにメッセージがいっぱいたまる可能性があるので注意する。
その場合は、Workerの数を増やすなど検討が必要となる。
サンプルスクリプト
WorkQueues
最初のチュートリアルではキューから送受信するプログラムを書いた。
WorkQueueを作成して時間のかかる作業を複数のWorkerに配布する。
WorkQueueの主な考え方は、リソースを大量に使う作業をやめる、作業が完了するまで待たないこと。
その代わり後で行う作業をスケジュールする。
作業をメッセージとしてカプセル化してキューに送信する。
バックグランドのWorkerはキューから作業を取得してジョブを実行する。
複数のWorkerが実行されていれば、Worker間で作業が共有される。
この考え方は短いHTTPリクエストで複雑な処理が難しいWebアプリケーションで役にたつ。
-> | C1 |
| P | -> | Q | --|
-> | C2 |
Preparation
チュートリアル前半では「Hello World!」を含むメッセージを送信しました。
今度は、複雑な作業を表す文字列を送信します。
サイズ変更するイメージやレンダリングするpdfファイルのような現実的な作業はありませんので、Kernel#sleepメソッドを使用して、ビジー状態にあると見せかけてみましょう。
文字列内の点の数をその複雑さとみなします。
すべてのドットが1秒間の「仕事」を占めます。たとえば、Hello ...によって記述された偽のタスクは3秒かかります。
前述の例のsend.rbコードを少し変更して、コマンドラインから任意のメッセージを送信できるようにします。
このプログラムは作業キューにタスクをスケジュールするので、new_task.rbという名前をつけましょう:
message = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
queue.publish(message, persistent: true)
puts " [x] Sent #{message}"
私たちの古いreceive.rbスクリプトにもいくつかの変更が必要です。
メッセージ本体のすべてのドットに対して2番目の作業を偽造する必要があります。
それはキューからメッセージをポップしてタスクを実行するので、worker.rbと呼ぶことにしましょう
queue.subscribe(block: true) do |delivery_info, _properties, body|
puts " [x] Received #{body}"
# imitate some work
sleep body.count('.').to_i
puts ' [x] Done'
end
私たちの偽のタスクは実行時間をシミュレートすることに注意してください。
チュートリアル1のように実行します。
$ bundle exec ruby new_task.rb aabcd b aeaf . .... . . . . . . .
[x] Sent aabcd b aeaf . .... . . . . . . .
$
$ bundle exec ruby worker.rb
[*] Waiting for messages. To exit press CTRL+C
[x] Received aabcd b aeaf . .... . . . . . . .
Round-robin dispatching
タスクキューを使用する利点の1つは、作業を簡単に並列化できることです。
私たちが仕事のバックログを構築しているならば、簡単に拡大することができます。
同時に2つのworker.rbを実行してみる。
どちらもキューからメッセージを取得する?
3つのコンソールを開く必要がある。
worker.rbを2つ実行する。
consumer2つ、C1とC2とする。
$ bundle exec ruby worker.rb
[*] Waiting for messages. To exit press CTRL+C
$ bundle exec ruby worker.rb
[*] Waiting for messages. To exit press CTRL+C
3つめは新しいタスクをpulishする。
コンシューマを起動してたらメッセージを公開できる。
$ bundle exec ruby new_task.rb First message.
[x] Sent First message.
$ bundle exec ruby new_task.rb Second message..
[x] Sent Second message..
$ bundle exec ruby new_task.rb Third message...
[x] Sent Third message...
$ bundle exec ruby new_task.rb Fourth message....
[x] Sent Fourth message....
$ bundle exec ruby new_task.rb Fifth message.....
[x] Sent Fifth message.....
workerに何が出力されるか見てみる。
$ bundle exec ruby worker.rb
[*] Waiting for messages. To exit press CTRL+C
[x] Received First message.
[x] Done
[x] Received Third message...
[x] Done
[x] Received Fifth message.....
[x] Done
$ bundle exec ruby worker.rb
[*] Waiting for messages. To exit press CTRL+C
[x] Received Second message..
[x] Done
[x] Received Fourth message....
[x] Done
デフォルトでは、RabbitMQは順番に次のコンシューマへメッセージを送信する。
コンシューマは常に平均的に同じ数のメッセージを取得する。
このメッセージ受信の方法はラウンドロビンと呼ばれる。
さらに3つ以上workerを増やしてみてください。
Message acknowledgment
タスク実行に数秒かかることがある。
一つのコンシューマが長いタスク始めた、部分的にしか行われず死んだ場合、何が起きるか疑問に思う。
今のコードでは、メッセージを届けるとRabbitMQはすぐに削除マークをつけてしまう。
このケースでは、workerを殺した場合、処理してたメッセージは失われてしまう。
特定のworkerに送信されたが、まだ処理されていない全てのメッセージが失われてしまう。
けど、どのタスクも失わせたくない。
workerが死んだ場合、他のworkerにタスクを届けたい。
メッセージが決して失われないために、RabbitMQはメッセージ確認(acknowledgments)をサポートしてる。
メッセージを受け取ったことをRabbitMQに伝える為にコンシューマはack(nowledgement)を返送すると、RabbitMQはメッセージをいつでも削除できる。
ackを送信せずにコンシューマが死んだ場合(チャンネルクローズした、コネクションクローズした、TCPコネクションが切れた)、RabbitMQはメッセージが完全に処理されなかったことを認識する。
他のコンシューマが同じ時間にオンラインであれば、他のコンシューマに再配送する。
この方法ならworkerが死んでもメッセージをロストしない。
メッセージタイムアウトはない。
RabbitMQはコンシューマが死んだ時にメッセージ再配送する。
メッセージの処理に長い時間がかかっても問題ない。
メッセージ確認(acknowledgments)はデフォルトでoff。
:manual_ackオプションを使ってそれらをオンにする
作業が終わったらワーカーから適切な確認を送るときです。
queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
puts " [x] Received '#{body}'"
# imitate some work
sleep body.count('.').to_i
puts ' [x] Done'
channel.ack(delivery_info.delivery_tag)
end
このコードを使用するとメッセージ処理してる時にCtrl+CでWorkerを殺してもロストはない。
workerが死んだ直後に未確認(unacknowledged)メッセージが再配送される。
ackを逃すのはよくある間違いです。
それは簡単なエラーですが、結果は深刻です。
クライアントが終了すると(ランダム再配信のように見えるかもしれませんが)メッセージは再配信されますが、RackbitMQは未送信メッセージを解放できないため、より多くのメモリを消費します。
この種の間違いをデバッグするために、rabbitmqctlを使ってmessages_unacknowledgedフィールドを表示することができます:
rabbitmqctl list_queues name messages_ready messages_unacknowledged
Message durability
コンシューマが死んでもタスクはロストしない方法を学んだ。
しかし、RabbitMQサーバが死んだ場合、タスクがロストする。
RabbitMQがクラッシュまたは終了するとキューやメッセージを忘れる。
メッセージをロストしない為に2つのことが必要。
キューとメッセージを耐久性のあるものとしてマークする。
最初に、RabbitMQは決してメッセージを失わせないようにする必要がる。
耐久性のあるものとして宣言する。
channel.queue('hello', durable: true)
このコマンドはそれだけでは正しいですが、現在の設定では機能しません.
helloというキューは既に定義されており、耐久性はありません。
RabbitMQでは、異なるパラメータを使用して既存のキューを再定義することはできません。これを実行しようとするプログラムにエラーが返されます
しかし、すばやく回避策があります。たとえば、task_queueなど、別の名前のキューを宣言しましょう。
channel.queue('task_queue', durable: true)
これは、永続的なオプションの変更をプロデューサコードとコンシューマコードの両方に適用する必要があります。
この時点で、RabbitMQが再起動してもtask_queueキューが失われないことが確実です。
今度は永続的なメッセージとしてマークする必要があります:永続的オプションBunny :: Exchange#publishが使用します。
exchange.publish(message, persistent: true)
Note
メッセージを永続的なものとしてマークすることは、メッセージが失われないことを完全に保証するものではありません。
RabbitMQは、メッセージをディスクに保存するように指示していますが、RabbitMQがメッセージを受け付けてまだ保存していない場合は、短時間のウィンドウが表示されます。
また、RabbitMQはすべてのメッセージに対してfsync(2)を実行しません。
単にキャッシュに保存され、実際にディスクに書き込まれることはありません。
永続性の保証は強くありませんが、単純なタスクキューでは十分です。
より強力な保証が必要な場合は、Publisher Confirms
を使用できます。
Fair dispatch
あなたは望んでるように仕事が割り当てられてないように思ってるかもしれない。
このExampleでは、2つのworkerで、奇数のメッセージは重い、偶数のメッセージは軽い、1つのworkerは常に忙しい、もう一つはほとんど仕事をしていないシチュエーション。
RabbitMQはその状況をしらずに処理を割り当て続けます。
これが起こるのはRabbitMQはキューにメッセージが入った時にすぐにメッセージを割り当てるから。
コンシューマのunacknowledgedメッセージ数をみていない。ただ単にn番目のメッセージをn番目のコンシューマに割り当てるだけ。
これを無効にするために、prefetchメソッドの値を1にする。
RabbitMQに一度に複数のメッセージをworkerに与えないように指示する。
以前のメッセージを処理して確認するまでは新しいメッセージをWorkerに送信しない。
n = 1;
channel.prefetch(n);
Note
すべてのWorkerがビジー状態の場合、キューがいっぱいになる。
それに注意を払い、もっと多くのWorker追加したり、他の戦略をとってみる。
上記すべてをまとめたスクリプト
- new_task.rb
#!/usr/bin/env ruby
require 'bunny'
connection = Bunny.new(automatically_recover: false)
connection.start
channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)
message = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
queue.publish(message, persistent: true)
puts " [x] Sent #{message}"
connection.close
- worker.rb
#!/usr/bin/env ruby
require 'bunny'
connection = Bunny.new(automatically_recover: false)
connection.start
channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)
channel.prefetch(1)
puts ' [*] Waiting for messages. To exit press CTRL+C'
begin
queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
puts " [x] Received '#{body}'"
# imitate some work
sleep body.count('.').to_i
puts ' [x] Done'
channel.ack(delivery_info.delivery_tag)
end
rescue Interrupt => _
connection.close
end