Cách sử dụng RabbitMQ và Python's Puka để gửi thông điệp đến nhiều người tiêu dùng
Yêu cầu
RabbitMQ
Bạn chỉ có thể làm việc với RabbitMQ để gửi và nhận tin nhắn sau khi cài đặt và cấu hình phần mềm. Cách cài đặt và quản lý RabbitMQ giải thích chi tiết cách để RabbitMQ hoạt động và là điểm khởi đầu tốt để sử dụng nhà message broker này.
Thư viện Puka Python
Tất cả các ví dụ trong bài viết này được trình bày bằng cách sử dụng ngôn ngữ Python được backup với thư viện puka xử lý giao thức nhắn tin AMQP. Python đã được chọn là ngôn ngữ rõ ràng và dễ hiểu vì mục đích trình bày dễ hiểu, nhưng vì AMQP là một giao thức được chấp nhận rộng rãi, bất kỳ ngôn ngữ lập trình nào khác đều được dùng tự do để đạt được các mục tiêu tương tự.
puka có thể được cài đặt nhanh chóng bằng cách sử dụng pip
- một trình quản lý gói Python.
pip install puka
pip không phải lúc nào cũng đi kèm với các bản phân phối Linux. Trên các bản phân phối dựa trên Debian (bao gồm cả Ubuntu), nó có thể được cài đặt dễ dàng bằng cách sử dụng:
apt-get install python-pip
Trên nền tảng RHEL, như CentOS:
yum install python-setuptools
easy_install pip
Giới thiệu về RabbitMQ và thuật ngữ của nó
Nhắn tin [cụ thể là RabbitMQ ] giới thiệu một số thuật ngữ mô tả các nguyên tắc cơ bản của nhà message broker và cơ chế của nó.
Nhà production là một bên gửi tin nhắn, do đó việc tạo ra một tin nhắn là production .
Người tiêu dùng là một bên nhận được tin nhắn, do đó việc nhận được một tin nhắn là tiêu thụ.
Hàng đợi là một cache trong đó các tin nhắn đã gửi được lưu trữ và sẵn sàng nhận. Không có giới hạn về số lượng thư mà một hàng đợi có thể chứa. Cũng không có giới hạn về số lượng nhà production có thể gửi một tin nhắn đến một hàng đợi, cũng như bao nhiêu người tiêu dùng có thể cố gắng truy cập nó. Khi một thông báo chạm vào hàng đợi hiện có, nó sẽ đợi ở đó cho đến khi người tiêu dùng truy cập hàng đợi cụ thể đó sử dụng. Khi một tin nhắn chạm vào hàng đợi không tồn tại, nó sẽ bị loại bỏ.
Trao đổi là một thực thể cư trú giữa người production và hàng đợi. Nhà production không bao giờ gửi một thông điệp trực tiếp đến hàng đợi. Nó gửi tin nhắn đến một trao đổi, đến lượt nó - đặt tin nhắn vào một hoặc nhiều hàng đợi, tùy thuộc vào trao đổi được sử dụng. Để sử dụng một phép ẩn dụ trong cuộc sống thực, trao đổi giống như một người đưa thư: Nó xử lý các tin nhắn để chúng được chuyển đến các hàng đợi thích hợp (hộp thư), từ đó người tiêu dùng có thể thu thập chúng.
Ràng buộc là một kết nối giữa hàng đợi và trao đổi. Các hàng đợi ràng buộc với một sàn giao dịch nhất định được cung cấp bởi sàn giao dịch. Chính xác như thế nào phụ thuộc vào chính sàn giao dịch.
Tất cả năm thuật ngữ sẽ được sử dụng trong suốt văn bản này. Có một thư viện nữa, liên quan chặt chẽ đến thư viện puka python, được chọn làm thư viện được lựa chọn vì sự rõ ràng của nó. Nó là một lời hứa , có thể được hiểu là một yêu cầu đồng bộ tới server AMQP đảm bảo thực hiện (thành công hay không) yêu cầu và trên đó client sẽ đợi cho đến khi hoàn thành.
Mặc dù puka có thể hoạt động không đồng bộ, trong các ví dụ của ta puka sẽ được sử dụng như một thư viện đồng bộ. Điều đó nghĩa là sau mỗi yêu cầu (lời hứa) puka sẽ đợi cho đến khi nó được thực thi trước khi chuyển sang bước tiếp theo.
Thử nghiệm RabbitMQ và Puka bằng một ví dụ đơn giản
Để kiểm tra xem trình broker thư và puka có hoạt động hoàn hảo hay không và để nắm rõ cách thức hoạt động của thư gửi và nhận trong thực tế, hãy tạo một tập lệnh python mẫu có tên là rabbit_test.py
vim rabbit_test.py
và dán nội dung tập lệnh:
import puka
# declare send and receive clients, both connecting to the same server on local machine
producer = puka.Client("amqp://localhost/")
consumer = puka.Client("amqp://localhost/")
# connect sending party
send_promise = producer.connect()
producer.wait(send_promise)
# connect receiving party
receive_promise = consumer.connect()
consumer.wait(receive_promise)
# declare queue (queue must exist before it is being used - otherwise messages sent to that queue will be discarded)
send_promise = producer.queue_declare(queue='rabbit')
producer.wait(send_promise)
# send message to the queue named rabbit
send_promise = producer.basic_publish(exchange='', routing_key='rabbit', body='Server test!')
producer.wait(send_promise)
print "Message sent!"
# start waiting for messages, also those sent before (!), on the queue named rabbit
receive_promise = consumer.basic_consume(queue='rabbit', no_ack=True)
print "Starting receiving!"
while True:
received_message = consumer.wait(receive_promise)
print "GOT: %r" % (received_message['body'],)
break
Nhấn : wq để lưu file và thoát.
Việc chạy tập lệnh sẽ in thông báo được tập lệnh gửi đến hàng đợi RabbitMQ , vì chương trình thử nghiệm nhận được thông báo ngay sau đó.
Đầu ra sẽ giống như sau:
root@rabbitmq:~# python rabbit_test.py
Message sent!
Starting receiving!
GOT: 'Server test!'
root@rabbitmq:~#
Để giải thích những gì xảy ra trong đoạn mã này, ta hãy đi từng bước:
Cả người tiêu dùng và nhà production đều được tạo và kết nối với cùng một server RabbitMQ, nằm trên
localhost
Producer khai báo một hàng đợi, đảm bảo nó tồn tại khi thông báo sẽ được tạo ra. Nếu không có bước này, một hàng đợi có thể không tồn tại và do đó các thư có thể bị hủy ngay lập tức.
Nhà production gửi thông báo đến một trao đổi không tên (sẽ có thêm các trao đổi sau) với một khóa định tuyến chỉ định hàng đợi được tạo trước. Sau đó, tin nhắn sẽ đến với sàn giao dịch, đến lượt nó sẽ được đặt vào hàng đợi "thỏ". Sau đó, thông điệp sẽ nằm ở đó cho đến khi ai đó sẽ sử dụng nó.
Người tiêu dùng truy cập vào hàng đợi "thỏ" và bắt đầu nhận các tin nhắn được lưu trữ ở đó. Bởi vì có một tin nhắn đang chờ, nó sẽ được gửi ngay lập tức. Nó đã được tiêu thụ, nghĩa là nó sẽ không còn ở trong hàng đợi.
Tin nhắn đã sử dụng sẽ được in trên màn hình.
Sàn giao dịch Fanout
Trong ví dụ trước, một trao đổi không tên đã được sử dụng để gửi thông điệp đến một hàng đợi cụ thể có tên là "thỏ". Trao đổi không tên cần một tên hàng đợi để hoạt động, nghĩa là nó chỉ có thể gửi thông điệp đến một hàng đợi duy nhất.
Ngoài ra còn có các loại trao đổi khác trong RabbitMQ , một trong số đó là fanout , mối quan tâm chính của ta trong văn bản này. Trao đổi Fanout là một công cụ đơn giản, đơn giản, gửi thông điệp đến TẤT CẢ các hàng đợi mà nó biết. Với trao đổi fanout, không cần (thực tế là không thể) cung cấp một tên hàng đợi cụ thể. Các tin nhắn đạt được loại trao đổi đó được gửi đến tất cả các hàng đợi liên kết với cuộc trao đổi trước khi tin nhắn được tạo. Không có giới hạn về số lượng hàng đợi có thể được kết nối với sàn giao dịch.
Xuất bản mẫu đăng ký của Vietnam
Với trao đổi fanout, ta có thể dễ dàng tạo mẫu xuất bản / đăng ký , hoạt động giống như một công cụ mở cho tất cả các bản tin. Nhà production , một người phát bản tin, gửi thông điệp định kỳ đến khán giả mà họ thậm chí có thể không biết (tạo ra thông điệp và gửi đến sàn giao dịch fanout bản tin). Người đăng ký mới đăng ký nhận bản tin (liên kết hàng đợi riêng với cùng một fanout bản tin). Kể từ thời điểm đó, sàn giao dịch fanout bản tin sẽ gửi tin nhắn đến tất cả các thuê bao đã đăng ký (xếp hàng).
Mặc dù nhắn tin một đối một khá đơn giản và các nhà phát triển thường sử dụng các phương tiện giao tiếp khác, nhưng một-nhiều (trong đó “nhiều” không được xác định và có thể là bất kỳ thứ gì giữa ít và nhiều ) là một kịch bản rất phổ biến trong đó một nhà message broker có thể giúp ích rất nhiều.
Viết ứng dụng nhà production
Role duy nhất của ứng dụng nhà production là tạo ra một trao đổi fanout được đặt tên và gửi các thông báo định kỳ (vài giây một lần) đến trao đổi đó. Trong một kịch bản đời thực, thông điệp sẽ được tạo ra vì một lý do. Để đơn giản hóa ví dụ, thư sẽ được tạo tự động. Ứng dụng này sẽ hoạt động như một nhà xuất bản bản tin.
Tạo một tập lệnh python có tên newsletter_produce.py
vim newsletter_produce.py
và dán nội dung tập lệnh:
import puka
import datetime
import time
# declare and connect a producer
producer = puka.Client("amqp://localhost/")
connect_promise = producer.connect()
producer.wait(connect_promise)
# create a fanout exchange
exchange_promise = producer.exchange_declare(exchange='newsletter', type='fanout')
producer.wait(exchange_promise)
# send current time in a loop
while True:
message = "%s" % datetime.datetime.now()
message_promise = producer.basic_publish(exchange='newsletter', routing_key='', body=message)
producer.wait(message_promise)
print "SENT: %s" % message
time.sleep(1)
producer.close()
Hãy đi từng bước với ví dụ để giải thích những gì xảy ra trong mã.
Máy khách Producer được tạo và kết nối với version RabbitMQ local . Từ bây giờ nó có thể giao tiếp với RabbitMQ một cách tự do.
Một trao đổi fanout
newsletter
được đặt tên được tạo. Sau bước đó, trao đổi tồn tại trên server RabbitMQ và được dùng để liên kết hàng đợi với nó và gửi tin nhắn qua nó.Trong một vòng lặp vô tận, các thông báo với thời gian hiện tại được tạo ra cho trao đổi
newsletter
. Lưu ýrouting_key
trống, nghĩa là không có hàng đợi cụ thể nào được chỉ định. Đó là trao đổi sẽ gửi thông điệp đến các hàng đợi thích hợp hơn nữa.
Ứng dụng khi chạy sẽ thông báo thời gian hiện tại cho tất cả người đăng ký nhận bản tin.
Viết đơn đăng ký người tiêu dùng
Ứng dụng người tiêu dùng sẽ tạo một hàng đợi tạm thời và ràng buộc nó với một sàn giao dịch fanout có tên. Sau đó, nó sẽ bắt đầu chờ tin nhắn. Sau khi ràng buộc hàng đợi với trao đổi, mọi tin nhắn được gửi bởi nhà production đã tạo trước đó sẽ được nhận bởi người tiêu dùng này. Ứng dụng này sẽ hoạt động như một người đăng ký bản tin - có thể chạy ứng dụng nhiều lần cùng một lúc và tất cả các version vẫn sẽ nhận được tin nhắn quảng bá.
Tạo một tập lệnh python có tên newsletter_consume.py
vim newsletter_consume.py
và dán nội dung tập lệnh:
import puka
# declare and connect a consumer
consumer = puka.Client("amqp://localhost/")
connect_promise = consumer.connect()
consumer.wait(connect_promise)
# create temporary queue
queue_promise = consumer.queue_declare(exclusive=True)
queue = consumer.wait(queue_promise)['queue']
# bind the queue to newsletter exchange
bind_promise = consumer.queue_bind(exchange='newsletter', queue=queue)
consumer.wait(bind_promise)
# start waiting for messages on the queue created beforehand and print them out
message_promise = consumer.basic_consume(queue=queue, no_ack=True)
while True:
message = consumer.wait(message_promise)
print "GOT: %r" % message['body']
consumer.close()
Mã người tiêu dùng phức tạp hơn một chút so với mã của nhà production . Hãy xem xét nó từng bước:
Khách hàng tiêu dùng được tạo và kết nối với version RabbitMQ local .
Một hàng đợi tạm thời được tạo. Tạm thời nghĩa là không có tên nào được cung cấp - tên hàng đợi sẽ được RabbitMQ tạo tự động . Ngoài ra, hàng đợi như vậy sẽ bị hủy sau khi client ngắt kết nối. Đó là một cách phổ biến để tạo hàng đợi chỉ tồn tại để ràng buộc với một trong các sàn giao dịch và không có mục đích đặc biệt nào khác. Vì cần phải tạo một hàng đợi để nhận bất cứ thứ gì, đó là một phương pháp thuận tiện để tránh suy nghĩ về tên hàng đợi.
Hàng đợi được tạo liên kết với trao đổi
newsletter
. Kể từ thời điểm đó, sàn giao dịch fanout sẽ gửi mọi tin nhắn đến hàng đợi đó.Trong một vòng lặp vô tận, người tiêu dùng chờ đợi trên hàng đợi, nhận mọi tin nhắn đến hàng đợi và in nó trên màn hình.
Ứng dụng khi chạy sẽ nhận được thông báo thời gian từ nhà xuất bản bản tin. Nó có thể được thực thi nhiều lần cùng một lúc và mỗi version của ứng dụng này sẽ có thời gian hiện tại.
Kiểm tra cả hai ứng dụng
Để kiểm tra nhà xuất bản bản tin và người tiêu dùng của họ, hãy mở nhiều phiên SSH tới server ảo (hoặc mở nhiều cửa sổ terminal , nếu làm việc trên máy tính local ).
Trong một trong các cửa sổ, chạy ứng dụng production .
root@rabbitmq:~# python newsletter_produce.py
Nó sẽ bắt đầu hiển thị mỗi giây vào thời điểm hiện tại:
SENT: 2014-02-11 17:24:47.309000
SENT: 2014-02-11 17:24:48.310000
SENT: 2014-02-11 17:24:49.312000
SENT: 2014-02-11 17:24:50.316000
...
Trong mọi cửa sổ khác, hãy chạy ứng dụng tiêu dùng:
root@rabbitmq:~# python newsletter_consume.py
Mỗi version của ứng dụng này sẽ nhận được thông báo thời gian do nhà production phát:
GOT: 2014-02-11 17:24:47.309000
GOT: 2014-02-11 17:24:48.310000
GOT: 2014-02-11 17:24:49.312000
GOT: 2014-02-11 17:24:50.316000
...
Nghĩa là RabbitMQ đã đăng ký sàn giao dịch fanout đúng cách, ràng buộc người đăng ký xếp hàng với sàn giao dịch này và gửi tin nhắn đã gửi đến các hàng đợi thích hợp. Nói cách khác, RabbitMQ đã hoạt động như mong đợi.
Đọc thêm
Xuất bản / đăng ký là một mẫu nhắn tin đơn giản (cả về khái niệm và cách triển khai) thường có thể hữu ích; nó không ở gần giới hạn RabbitMQ mặc dù. Có vô số cách sử dụng RabbitMQ để giải quyết các vấn đề về nhắn tin, bao gồm định tuyến tin nhắn nâng cao, xác nhận tin nhắn, bảo mật hoặc tính bền bỉ.
Mục tiêu chính của văn bản này là giới thiệu các khái niệm nhắn tin cơ bản bằng các ví dụ đơn giản. Nhiều cách sử dụng khác được đề cập chi tiết trong tài liệu chính thức của RabbitMQ , đây là một tài nguyên tuyệt vời cho user và administrator RabbitMQ.
<div class = “author”> Bài viết được gửi bởi: <a href=p>http://maticomp.net[> Mateusz Papiernik </a> </div>
Các tin liên quan