Publish-Subscriber¶
Một message
được gửi tới nhiều consumer
khác nhau được RabbitMQ phân phối dựa trên Publish/Subscribe
.
Publish/subscribe messaging
: Mô hình phân phối message này tương tự Pub/Sub trong Redis,Publisher
-who is the source of data
sẽ xuất bản các message theo dạngtopic
vàSubscriber
-who is the receiver of data
sẽ đăng ký các message họ cần nhận dựa trên cáctopic
này.
Publishers/Subscribes Relationship¶
-
One-to-one
hoặcunicast
: Có thể là mối quan hệ một chiều hoặc hai chiều, ví dụ One-One chat hoặc private chat. -
One-to-many
hoặcbroadcast
/fan-out
: Ví dụ Live poll, Radio, v.vv -
Many-to-many
hoặcmulticast
: Ví dụ như group chat -
Many-to-one
hoặcconsolidation
/fan-in
: Ví dụ như Sensor data live poll, audience participation.
Pub/Sub Message Flow¶
-
Publisher
tiến hành tạo mộtTopic
tạiPub/Sub Model
sau đó gửimessages
tới topic này. -
Sau đó,
Message Storage Queue
đảm bảo rằngpublished messages
được lưu lại vàdepend on
tớisubscribers
, khi nào cótime acknowledgment
thì sẽ tiến hành giải phóng message này khỏiMessage Storage Queue
đểPub/Sub
gửi đếnSubscriber
. -
Khi có
time acknowledgment
của một message thì Pub/Sub tiến hànhforwards messages
từ topic đến tất cả subscriptions của message đó. -
When a subscriber receives a message either by push operation done by Pub/Sub or Subscriber pulling from the service.
-
Message được gửi tới subscriber theo 2 kiểu: Pub/Sub push cái message tới subscriber hoặc subscriber tự pull cái message đó về.
-
Nhận được message rồi thì subscriber gửi một flags
acknowledgment
đến Pub/Sub để Pub/Sub biết rằng message đã được nhận. Khi đó nó sẽ remove cáiacknowledged message
khỏi Message Storage Queue.
Example¶
Trong ví dụ về Publish-Subscriber này, ta sẽ xây dựng một hệ thống log đơn giản gồm có 2 thành phần:
- Một chương trình sẽ phát sinh ra log
- Một chương trình để nhận log và in nó ra màn hình.
Tại một thời điểm, chúng ta sẽ tạo ra log và dùng một chương trình để tiếp nhận và ghi ra disk, một máy khác để lấy lại những log và hiển thị nó ra màn hình.
Các bản tin log sẽ được gửi đến tất cả các consumer
.
Exchange¶
Trong ví dụ này, ta dùng fanout exchange
Với fanout
, khi một message
được gửi đi, nó sẽ đẩy đến tất cả các queue
hiện có.
Temporary queues¶
Các worker
làm việc với nhau qua cùng một queue
, vì thế việc đặt tên cho queue
rất quan trọng, nó giúp chúng ta định hình được công việc mà chúng xử lý.
Trong một số trường hợp , chúng ta muốn lắng nghe tất cả các log message
hãy làm theo các bước sau:
Đầu tiên, mỗi khi kết nối tới RabbitMQ, chúng ta cần phải làm mới các queue
và tạo ra một queue
với tên ngẫu nhiên. Để tạo ra một hàng đợi tạm thời (temporary queues), chúng ta sử dụng code như sau:
Khi thực thi, RabbitMQ sẽ tạo ra 1 queue
có tên ngẫu nhiên giống như amq.gen-JzTY20BRgKO-HjmUJj0wLg
Sau đó, mỗi lần ngắt kết nối queue
sẽ bị xóa với tùy chọn exclusive
:
Bindings¶
Ở phần trên, chúng ta đã tạo ra một exchange
kiểu fanout
và một queue
. Bây giờ, chúng ta cần phải "chỉ" cho exchange
biết phải gửi message
cho queue
như thế nào bằng cách dùng binding
.
Publish-Subscriber
theo sơ đồ sau:
Chạy chương trình:
Kết quả ghi log:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.