python使用pika模块操作RabbitMQ,我们可以通过sudo pip3 install pika
来安装pika模块.
简单的接收-发送模型
send.py
import pika
# producer
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare('hello')
channel.basic_publish(exchange='', routing_key='hello',body='Hello World!')
print('[x] sent "Hello World!"')
connection.close()
# 我们可以在命令行模式运行 rabbitmqctl list_queues 来查看队列
receive.py
import pika
# receive
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 如果不声明队列标识的话,如果接收程序先启动,将会报错
channel.queue_declare(queue = 'hello')
# 定义callback函数,处理在队列中获取的数据
def callback(ch, method, properties, body):
print("[x] Received %r" % body.decode())
channel.basic_consume(callback, queue= 'hello', no_ack = True)
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
工作队列模型
The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to the queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
This concept is especially useful in web applications where it's impossible to handle a complex task during a short HTTP request window.
In order to make sure a message is never lost, RabbitMQ supports An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
Manual message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True
flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.
new_task.py
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 将声明队列持久话,rabbitmq不允许同名队列
channel.queue_declare(queue='task_queue', durable = True)
message = ''.join(sys.argv[1]) or 'Hello World!'
channel.basic_publish(exchange = '',
routing_key = 'task_queue',
body = message,
properties = pika.BasicProperties(delivery_mode= 2)) # 使消息持久化
print('[x] send %r' % message)
connection.close()
worker.py
import pika,time
connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))
channel = connection.channel()
channel.queue_declare(queue= 'task_queue',durable = True)
print('[*] Watting for messages, to exit press CTRL+C.')
def callback(ch, method, properties, body):
print('[x] received %r from queue' % body)
time.sleep(body.count(b'.'))
print('[x] Done')
ch.basic_ack(delivery_tag = method.delivery_tag) # 手动发送ack,如果没有发送,队列里的消息将会发给下一个worker
channel.basic_qos(prefetch_count = 1) # 让rabbitmq不要一次将超过1条消息发送给work
channel.basic_consume(callback, queue = 'task_queue')
channel.start_consuming()
It's a common mistake to miss the basic_ack. It's an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.
In order to debug this kind of mistake you can use rabbitmqctl to print the
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
发布者/订阅者模型
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's create an exchange of that type, and call it logs
emit_log.py
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
receive_log.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
列出所有exchange
sudo rabbitmqctl list_exchanges
列出所有绑定信息:
sudo rabbitmqctl list_bindings
路由模型
Direct exchange
Our logging system from the previous tutorial broadcasts all messages to all consumers. We want to extend that to allow filtering messages based on their severity. For example we may want the script which is writing log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.
We will use a direct exchange instead. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.
emit_logs_direct.py
import pika,sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))
channel = connection.channel()
channel.exchange_declare(exchange= 'direct_logs', exchange_type = 'direct')
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ''.join(sys.argv[2:]) or 'Hello World'
channel.basic_publish(exchange = 'direct_logs', routing_key = severity, body = message)
print('[x] sent %r %r' % (severity, message))
connection.close()
receive_logs_direct.py
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))
channel = connection.channel()
channel.exchange_declare(exchange= 'direct_logs', exchange_type = 'direct')
result = channel.queue_declare(exclusive = True)
queue_name = result.method.queue
severites = sys.argv[1:]
if not severites:
sys.stderr.write("[Usage] : %s [info] [warinning] [error]\n" % sys.argv[0])
sys.exit()
for severity in severites:
channel.queue_bind(exchange = 'direct_logs', queue = queue_name, routing_key=severity)
print('[*] Watting for logs. To exit press CTRL+C')
def callback(ch,method,properties, body):
print('[x] %r %r' % (method.routing_key, body))
channel.basic_consume(callback, queue=queue_name,no_ack= True)
channel.start_consuming()
话题模型
Topic exchange
Messages sent to a topic exchange can't have an arbitrary routing_key - it must be a list of words, delimited by dots. The words can be anything, but usually they specify some features connected to the message. A few valid routing key examples: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit". There can be as many words in the routing key as you like, up to the limit of 255 bytes.
The binding key must also be in the same form. The logic behind the topic exchange is similar to a direct one - a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. However there are two important special cases for binding keys:
- * (star) can substitute for exactly one word.
- # (hash) can substitute for zero or more words.
emit_log_topic.py
import pika,sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange= 'topic_logs', exchange_type='topic')
rk = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',routing_key=rk,body=message)
print('[x] sent %r %r' % (rk,message))
connection.close()
receive_logs_topic.py
import pika,sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host= 'localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(exclusive = True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write('Usage: %s [binding_key]...' % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange = 'topic_logs', queue = queue_name, routing_key= binding_key)
print('[x] Watting for lgos. To exit press CTRL+C')
def callback(ch,method, properties,body):
print("[x] %r %r" % (method.routing_key, body))
channel.basic_consume(callback,queue = queue_name, no_ack= True)
channel.start_consuming()
RPC模型
Our RPC will work like this:
When the Client starts up, it creates an anonymous exclusive callback queue.
For an RPC request, the Client sends a message with two properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
The request is sent to an rpc_queue queue.
The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
The client waits for data on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.
rpc_server.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
rpc_client.py
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)