RabbitMQ浅读

消息分发策略

当有多个消费者时,RabbitMQ将会轮流的将消息发送给消费者.这种分发消息的方式称为'round-robin'
看例子
sender.py

# encoding:utf8
__author__ = 'brianyang'

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=sys.argv[1])

print 'send done'
connection.close()

receiver.py

# encoding:utf8
__author__ = 'brianyang'

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare('hello')


def callback(ch, method, prop, body):
    print body


channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()
connection.close()

运行两个消费者

shell1$ python receiver.py
shell2$ python receiver.py

发送消息

python sender.py msg1
python sender.py msg2
python sender.py msg3
python sender.py msg4
python sender.py msg5
python sender.py msg6

生产者收到的内容为
receiver1

shell1$ msg1
shell1$ msg3
shell1$ msg5

receiver2

shell1$ msg2
shell1$ msg4
shell1$ msg6


可以看到消息是以轮询的方式发送给消费者的.

消息容错机制

客户端崩溃

引入ack确认机制.客户端每成功消费一个消息,向服务器发送一个确认消息,通知服务器这条消息已经被成功消费.服务器收到消息后,将这条消息从unacknowledged中移除,否则,服务器就会一直等待客户端发来的ack消息.当客户端出现异常时,未消费的消息被重新放到消费队列中.这样避免了客户端崩溃造成的数据丢失.
启用ack机制的消费者代码如下:

# encoding:utf8
__author__ = 'brianyang'

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare('hello1')


def callback(ch, method, prop, body):
    print body
    time.sleep(3)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()

就是要保证no_ack为False,这也是no_ack的默认值.
这里一定要显式的发送确认消息`ch.basic_ack(delivery_tag=method.delivery_tag)明确的告诉服务器消息被处理了.
查看队列中的消息数量,可以使用

brianyang@brianyang-Latitude-E5440:/home/q/title/test$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello1  6   2

当消费者在消费时被强行结束时,消息并没有丢失,只要出现可用的消费者时,消息会被重新发送.

服务端崩溃

可以通过持久化(durable)来确保通道和消息都被保存到磁盘中进行持久化,但是由于从内存写入磁盘也需要时间,如果这段时间出现故障,则这些消息也是会丢失的.所以durable是一种弱的持久化.
持久化需要在queue和message中声明.
sender.py

# encoding:utf8
__author__ = 'brianyang'

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello1', durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello1',
                      body=sys.argv[1],
                      properties=pika.BasicProperties(
                          delivery_mode=2,
                      )
)

print 'send done'
connection.close()

receiver.py

# encoding:utf8
__author__ = 'brianyang'

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare('hello1', durable=True)


def callback(ch, method, prop, body):
    print body
    time.sleep(3)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()

hello1被声明为了持久化的通道,这里不能用hello命名,因为之前已经存在了一个非持久化的通道hello,RabbitMQ不允许对一个已经声明过的通道进行重定义.
生产者在发送消息时,将消息的类型定义为delivery_mode=2,用来将消息持久化.
通过例子来看下效果,消息持久化前,也就是没有添加durable时,重启server后消息会丢失.

非持久化

添加持久化选项后,重启server后消息没有丢失.
持久化

RabbitMQ对新入列的消息进行分配,不会考虑消费者的状态,如果两个消费者一个处理能力强,一个处理能力弱,长时间下来就会造成一个消费者消息堆积,另一个消费者相对很闲,为了公平期间,可以设置每次每个消费者只分发N个任务,只有任务收到ack后,才继续分发任务.(当no_ack为True时,这个功能失效)
修改后的代码receiver.py内容不变,sender.py修改为
sender.py

# encoding:utf8
__author__ = 'brianyang'

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare('hello1', durable=True)


def callback(ch, method, prop, body):
    print body
    time.sleep(3)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()

prefetch_count=1意味着每次只为消费者分配一条消息,消费者处理完成之后,才分配新的消息

exchange 发布/订阅模型

exchange的作用好比:收发邮件时的邮件组.如果A,B在邮件组中,C不在邮件组中,那么当Z向邮件组发邮件时,只有A,B能收到,对于Z而言,并不关心邮件组里有谁,只负责向邮件组里发邮件,如果邮件组里没人,那么邮件就会被丢弃,当Z发了100封邮件后,C加入了邮件组,那么C只能收到从第101封开始的邮件,之前的邮件是看不到的.在这里Z是发布者,A,B,C是订阅着,邮件组是exchange.


盗图一张

其中P就是发布者(Z),C1,C2就是消费者(A,B,C),X就是exchange(邮件组),amq.gen-RQ6..和amq.gen_As8..就是exchange与消费者之间通信的通道(邮箱).
上代码
receiver.py

# encoding:utf8
__author__ = 'brianyang'

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='exg', queue=queue_name)


def callback(ch, method, prop, body):
    print body
    time.sleep(3)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()
connection.close()

sender.py

# encoding:utf8
__author__ = 'brianyang'

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='exg', type='fanout')

channel.basic_publish(exchange='exg',
                      routing_key='',
                      body=sys.argv[1])

print 'send done'
connection.close()

下面是个例子,可以体会下与使用queue有什么不同.


exchange

直接使用channel时,消息是面对消费者的,每条消息都会等待消费者消费,而使用exchange时,消息是面对exchange的,对于是否有消费者通过channel与exchange绑定是未知的,exchange不会将消息保存起来等待消费者.

路由

使用type类型为fanout的exchange作为中转时,所有的订阅着都会收到相同的消息,假设我有两个用户,一个是vip,一个是普通用户,有些消息我只想发给vip,不想让普通用户也收到,这时候就要学习下路由功能.

首先看下代码
sender.py

# encoding:utf8
__author__ = 'brianyang'

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='exchange', type='direct')

channel.basic_publish(exchange='exchange',
                      routing_key=sys.argv[1],
                      body=sys.argv[2])

print 'send done'
connection.close()

receiver.py

# encoding:utf8
__author__ = 'brianyang'

import pika
import time
import sys

exg_types = sys.argv[1:]

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for exg_type in exg_types:
    channel.queue_bind(exchange='exchange', queue=queue_name, routing_key=exg_type)


def callback(ch, method, prop, body):
    print method.routing_key
    print body
    time.sleep(1)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()
connection.close()

在receiver.py中,绑定exchange与queue的操作中多了一个routing_key=exg_type,这里routing_key就是一个路由标识,只有当sender使用basic_publish指定的routing_key等于这个routing_key时,消息才会通过exchange发送到该queue中,同时要注意,声明exchange的type也变为了direct而不是之前的fanout.从字面上也很容易理解,之前的方式是广播,现在的方式是直连.
继续盗图

routing_pic

通过演示看下效果
routing

更加复杂的路由

当exchange的type为direct时,通过判断绑定的routing_key与发送的routing_key是否相等来判断应该将消息放入到哪个channel中.这是最简单的一种匹配方式,设想有这样一种场景,公司给员工通过队列发送消息,员工分为程序猿,前端喵和产品狗,同时员工又分为不同的级别:初级,中级和高级,员工又有不同的性别,公司对每种类别的员工的消息也不同,例如对于初级女前端,公司的祝福语为:前端的萌妹子感谢你在公司1年的付出,对于高级男程序猿,公司的祝福语为:后端的屌丝男感谢你在公司3年的付出.
对于这种维度更加广的路由,可以使用Topics. 使用Topics也很简单,首先将exchange的type变为topic.
topics支持通配符,如下:

  • * (star) can substitute for exactly one word. 使用*表示一个单词
  • # (hash) can substitute for zero or more words. 使用#表示0个或多个单词
    routing_key的定义必须遵循 - 由一系列英文逗号分割的单词组成
    例如上面的高级男程序猿就可以定义为:high.man.monkey,初级女前端喵定义为:low.women.cat
    如果一个channel的routing_key为
  • high.# : 接收向所有的高级员工发送的信息
  • high.*.monkey : 接收向所有的高级程序猿发送的信息
  • ..dog : 向所有产品狗发送的信息

原谅我这么粗鲁的称呼,这只是俚语!

通过topic的方式,可以更加精准的控制路由
继续盗图:


topic

代码如下:
sender.py

# encoding:utf8
__author__ = 'brianyang'

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='exchange', type='topics')

channel.basic_publish(exchange='exchange',
                      routing_key=sys.argv[1],
                      body=sys.argv[2])

print 'send done'
connection.close()

receiver.py代码不变,看下演示:


topic

实现RPC

RabbitMQ将消息放在消息队列中,可以方便的实现生产者-消费者模型.而RPC(远程过程调用)是一种构建SOA非常关键的技术,即面向服务架构.服务可以分布在集群中,通过增减机器可以方便的扩展服务的处理能力.
RabbitMQ实现RPC的原理就是,请求服务的应用将请求参数放入到请求队列中,同时传递一个回调队列和唯一id,回调队列用来存放服务方的计算结果,唯一id用来识别是客户端的哪一次请求,需要保证唯一性.
服务端在请求队列中获取到消息后,进行计算,计算结束后将结果放入请求方给的回调队列中,同时传回唯一id.
盗图


RPC_pic

首先看下代码
customer.py

# encoding:utf8

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.queue_declare('service_queue')


def Fib(n):
    if n == 1 or n == 2:
        return n
    return Fib(n - 1) + Fib(n - 2)


def Cal(ch, method, props, body):
    num = int(body)
    print 'cal {}'.format(num)
    resp = Fib(num)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(resp)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(Cal, queue='service_queue')
channel.start_consuming()

sender.py

# encoding:utf8

import pika
import uuid
import sys

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='service_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

num = int(sys.argv[1])
response = fibonacci_rpc.call(num)
print(" [x] Requesting fib({})".format(num))
print(" [.] Result is %r" % response)

根据官网的demo稍微修改
效果如下:

RPC

可以看到当客户端请求计算一个比较大的数的Fib数列值的时候,客户端和服务器都阻塞了,当另一个客户端请求计算时,由于没有消费者可以消费所以也阻塞了,这就好比在生产中单台服务器提供服务遇到了瓶颈,SOA架构可以方便的扩容,在这里就是又启了一个消费者,通过这种方法可以动态的改变集群的处理能力.
这些内容都在官方快速入门可以看到,我只是搬运工+汉化,加深印象,RabbitMQ tutorials

好吧,简书竟然有张图传不上去,有兴趣的来看看原文

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,353评论 2 34
  • RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息...
    彩虹之梦阅读 1,085评论 2 1
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,093评论 3 51
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 2,993评论 3 41