消息分发策略
当有多个消费者时,RabbitMQ将会轮流的将消息发送给消费者.这种分发消息的方式称为'round-robin'
看例子
sender.py
# encoding:utf8
__author__ = 'brianyang'
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body=sys.argv[1])
print 'send done'
connection.close()
receiver.py
# encoding:utf8
__author__ = 'brianyang'
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare('hello')
def callback(ch, method, prop, body):
print body
channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()
connection.close()
运行两个消费者
shell1$ python receiver.py
shell2$ python receiver.py
发送消息
python sender.py msg1
python sender.py msg2
python sender.py msg3
python sender.py msg4
python sender.py msg5
python sender.py msg6
生产者收到的内容为
receiver1
shell1$ msg1
shell1$ msg3
shell1$ msg5
receiver2
shell1$ msg2
shell1$ msg4
shell1$ msg6
可以看到消息是以轮询的方式发送给消费者的.
消息容错机制
客户端崩溃
引入ack确认机制.客户端每成功消费一个消息,向服务器发送一个确认消息,通知服务器这条消息已经被成功消费.服务器收到消息后,将这条消息从unacknowledged中移除,否则,服务器就会一直等待客户端发来的ack消息.当客户端出现异常时,未消费的消息被重新放到消费队列中.这样避免了客户端崩溃造成的数据丢失.
启用ack机制的消费者代码如下:
# encoding:utf8
__author__ = 'brianyang'
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare('hello1')
def callback(ch, method, prop, body):
print body
time.sleep(3)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()
就是要保证no_ack为False,这也是no_ack的默认值.
这里一定要显式的发送确认消息`ch.basic_ack(delivery_tag=method.delivery_tag)明确的告诉服务器消息被处理了.
查看队列中的消息数量,可以使用
brianyang@brianyang-Latitude-E5440:/home/q/title/test$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello1 6 2
当消费者在消费时被强行结束时,消息并没有丢失,只要出现可用的消费者时,消息会被重新发送.
服务端崩溃
可以通过持久化(durable)来确保通道和消息都被保存到磁盘中进行持久化,但是由于从内存写入磁盘也需要时间,如果这段时间出现故障,则这些消息也是会丢失的.所以durable是一种弱的持久化.
持久化需要在queue和message中声明.
sender.py
# encoding:utf8
__author__ = 'brianyang'
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello1', durable=True)
channel.basic_publish(exchange='',
routing_key='hello1',
body=sys.argv[1],
properties=pika.BasicProperties(
delivery_mode=2,
)
)
print 'send done'
connection.close()
receiver.py
# encoding:utf8
__author__ = 'brianyang'
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare('hello1', durable=True)
def callback(ch, method, prop, body):
print body
time.sleep(3)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()
hello1被声明为了持久化的通道,这里不能用hello命名,因为之前已经存在了一个非持久化的通道hello,RabbitMQ不允许对一个已经声明过的通道进行重定义.
生产者在发送消息时,将消息的类型定义为delivery_mode=2
,用来将消息持久化.
通过例子来看下效果,消息持久化前,也就是没有添加durable时,重启server后消息会丢失.
添加持久化选项后,重启server后消息没有丢失.
RabbitMQ对新入列的消息进行分配,不会考虑消费者的状态,如果两个消费者一个处理能力强,一个处理能力弱,长时间下来就会造成一个消费者消息堆积,另一个消费者相对很闲,为了公平期间,可以设置每次每个消费者只分发N个任务,只有任务收到ack后,才继续分发任务.(当no_ack为True时,这个功能失效)
修改后的代码receiver.py内容不变,sender.py修改为
sender.py
# encoding:utf8
__author__ = 'brianyang'
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare('hello1', durable=True)
def callback(ch, method, prop, body):
print body
time.sleep(3)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()
prefetch_count=1
意味着每次只为消费者分配一条消息,消费者处理完成之后,才分配新的消息
exchange 发布/订阅模型
exchange的作用好比:收发邮件时的邮件组.如果A,B在邮件组中,C不在邮件组中,那么当Z向邮件组发邮件时,只有A,B能收到,对于Z而言,并不关心邮件组里有谁,只负责向邮件组里发邮件,如果邮件组里没人,那么邮件就会被丢弃,当Z发了100封邮件后,C加入了邮件组,那么C只能收到从第101封开始的邮件,之前的邮件是看不到的.在这里Z是发布者,A,B,C是订阅着,邮件组是exchange.
其中P就是发布者(Z),C1,C2就是消费者(A,B,C),X就是exchange(邮件组),amq.gen-RQ6..和amq.gen_As8..就是exchange与消费者之间通信的通道(邮箱).
上代码
receiver.py
# encoding:utf8
__author__ = 'brianyang'
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='exg', queue=queue_name)
def callback(ch, method, prop, body):
print body
time.sleep(3)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()
connection.close()
sender.py
# encoding:utf8
__author__ = 'brianyang'
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='exg', type='fanout')
channel.basic_publish(exchange='exg',
routing_key='',
body=sys.argv[1])
print 'send done'
connection.close()
下面是个例子,可以体会下与使用queue有什么不同.
直接使用channel时,消息是面对消费者的,每条消息都会等待消费者消费,而使用exchange时,消息是面对exchange的,对于是否有消费者通过channel与exchange绑定是未知的,exchange不会将消息保存起来等待消费者.
路由
使用type类型为fanout的exchange作为中转时,所有的订阅着都会收到相同的消息,假设我有两个用户,一个是vip,一个是普通用户,有些消息我只想发给vip,不想让普通用户也收到,这时候就要学习下路由功能.
首先看下代码
sender.py
# encoding:utf8
__author__ = 'brianyang'
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='exchange', type='direct')
channel.basic_publish(exchange='exchange',
routing_key=sys.argv[1],
body=sys.argv[2])
print 'send done'
connection.close()
receiver.py
# encoding:utf8
__author__ = 'brianyang'
import pika
import time
import sys
exg_types = sys.argv[1:]
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for exg_type in exg_types:
channel.queue_bind(exchange='exchange', queue=queue_name, routing_key=exg_type)
def callback(ch, method, prop, body):
print method.routing_key
print body
time.sleep(1)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()
connection.close()
在receiver.py中,绑定exchange与queue的操作中多了一个routing_key=exg_type
,这里routing_key就是一个路由标识,只有当sender使用basic_publish指定的routing_key等于这个routing_key时,消息才会通过exchange发送到该queue中,同时要注意,声明exchange的type也变为了direct而不是之前的fanout.从字面上也很容易理解,之前的方式是广播,现在的方式是直连.
继续盗图
通过演示看下效果
更加复杂的路由
当exchange的type为direct时,通过判断绑定的routing_key与发送的routing_key是否相等来判断应该将消息放入到哪个channel中.这是最简单的一种匹配方式,设想有这样一种场景,公司给员工通过队列发送消息,员工分为程序猿,前端喵和产品狗,同时员工又分为不同的级别:初级,中级和高级,员工又有不同的性别,公司对每种类别的员工的消息也不同,例如对于初级女前端,公司的祝福语为:前端的萌妹子感谢你在公司1年的付出,对于高级男程序猿,公司的祝福语为:后端的屌丝男感谢你在公司3年的付出.
对于这种维度更加广的路由,可以使用Topics. 使用Topics也很简单,首先将exchange的type变为topic.
topics支持通配符,如下:
-
*
(star) can substitute for exactly one word. 使用*表示一个单词 -
#
(hash) can substitute for zero or more words. 使用#表示0个或多个单词
routing_key的定义必须遵循 - 由一系列英文逗号分割的单词组成
例如上面的高级男程序猿就可以定义为:high.man.monkey,初级女前端喵定义为:low.women.cat
如果一个channel的routing_key为 - high.# : 接收向所有的高级员工发送的信息
- high.*.monkey : 接收向所有的高级程序猿发送的信息
- ..dog : 向所有产品狗发送的信息
原谅我这么粗鲁的称呼,这只是俚语!
通过topic的方式,可以更加精准的控制路由
继续盗图:
代码如下:
sender.py
# encoding:utf8
__author__ = 'brianyang'
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='exchange', type='topics')
channel.basic_publish(exchange='exchange',
routing_key=sys.argv[1],
body=sys.argv[2])
print 'send done'
connection.close()
receiver.py代码不变,看下演示:
实现RPC
RabbitMQ将消息放在消息队列中,可以方便的实现生产者-消费者模型.而RPC(远程过程调用)是一种构建SOA非常关键的技术,即面向服务架构.服务可以分布在集群中,通过增减机器可以方便的扩展服务的处理能力.
RabbitMQ实现RPC的原理就是,请求服务的应用将请求参数放入到请求队列中,同时传递一个回调队列和唯一id,回调队列用来存放服务方的计算结果,唯一id用来识别是客户端的哪一次请求,需要保证唯一性.
服务端在请求队列中获取到消息后,进行计算,计算结束后将结果放入请求方给的回调队列中,同时传回唯一id.
盗图
首先看下代码
customer.py
# encoding:utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare('service_queue')
def Fib(n):
if n == 1 or n == 2:
return n
return Fib(n - 1) + Fib(n - 2)
def Cal(ch, method, props, body):
num = int(body)
print 'cal {}'.format(num)
resp = Fib(num)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(resp)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(Cal, queue='service_queue')
channel.start_consuming()
sender.py
# encoding:utf8
import pika
import uuid
import sys
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='service_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
num = int(sys.argv[1])
response = fibonacci_rpc.call(num)
print(" [x] Requesting fib({})".format(num))
print(" [.] Result is %r" % response)
根据官网的demo稍微修改
效果如下:
可以看到当客户端请求计算一个比较大的数的Fib数列值的时候,客户端和服务器都阻塞了,当另一个客户端请求计算时,由于没有消费者可以消费所以也阻塞了,这就好比在生产中单台服务器提供服务遇到了瓶颈,SOA架构可以方便的扩容,在这里就是又启了一个消费者,通过这种方法可以动态的改变集群的处理能力.
这些内容都在官方快速入门可以看到,我只是搬运工+汉化,加深印象,RabbitMQ tutorials
好吧,简书竟然有张图传不上去,有兴趣的来看看原文把