消息队列之RabbitMQ《一》

经常听人说到消息队列。顾名思义,消息队列,处理的对象是消息,而队列是先进先出。我们随随便便的一个请求,可能涉及到多个服务,服务之间需要互相通信,那就是消息。消息队列是一种进程之间通信或者同一进程不同线程之间的通信方式。主要解决应用的耦合、异步消息、流量削峰。
虽然kafka很有名,但是rabbitMQ的文档更好看,对初学者很友好。

学习消息队列,我们首先要了解2个东西,生产者(Produce)和消费者(Consumer)。生产者发送消息,消费者消费消息。


Producer发送消息hello队列,然后Consumer从hello队列去获取消息

RabbitMQ是一套开源(MPL)的消息队列服务软件,它实现了高级消息队列协议(AMQP)。它提供server服务,同时支持多种语言的客户端连接。
首先我们要确定自己需要将rabbitMQ的Server服务部署在哪台(或哪几台)机器上。然后我在那台机器上下载安装rabbitMQ或者使用docker启动 。我在我本机上(mac)用docker启动。


截屏2022-04-08 下午6.14.28.png

docker命令启动如下:

# for RabbitMQ 3.9, the latest series
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

rabbitMQ服务起来之后,要使用它上面的消息队列。我们这边需要与之进行连接通讯。python官方推荐使用pika。所以我们需要下载pika

pip install pika

一切准备就绪,我们可以开始了。首先是我们的生产者producer。它需要:

1、与rabbitMQ建立连接。创建一个pika. BlockingConnection的连接对象,并创建获取一个与之通信的channel。
2、使用queue_declare声明一个队列,如果需要则创建。因为我们发送消息之前必须确保相应的队列是存在的,如果不存在,则rabbitMQ会把该消息丢弃掉。
3、通过channel发送消息到rabbitMQ Server。通常情况下,消息都不会直接发送到队列,而是会经过exchange交换机。但是RabbitMQ提供了一个默认的exchange,当使用默认的exchange时,我们可以通过routing_key直接指定发送给哪个队列。
具体代码如下:sender.py

import pika

#与broker建立连接,默认端口是5672
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建一个新的channel,可以指定channel_num,未指定的话由系统分配一个有效的。
channel = connection.channel()

#声明一个"hello"队列,如果不存在,有需要就创建
channel.queue_declare(queue='hello')

#发送5次Hello World消息
for i in range(5):
#exchange设置为' '表示使用默认的exchange,通过routing_key指定直接发送到哪个队列,routing_key的值即为queue的名称。
    channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!'+str(i))
    print("[X] Sent 'Hello World!"+str(i))

#关闭连接   
connection.close()

然后我们再看下消费者怎么消费队列的消息:
receive.py

import sys
import os
import pika

#定义一个收到消息后的回调函数,该函数需要4个参数:
# - channel: BlockingChannel,表示所在的channel
# - method: spec.Basic.Deliver
# - properties: spec.BasicProperties
# - body: bytes,表示收到的消息
def callback(ch,method,properties,body):
    print("[X] Received %r" %body)

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    #同样,声明一个队列,如果不存在就创建。
    channel.queue_declare(queue='hello')

    #调用basic_consume,指定queue和回调函数,接受消息
    # auto_ack: 自动确认消息
    channel.basic_consume(queue='hello',
                          auto_ack=True,
                          on_message_callback=callback)

    print("[*] Waiting for message.To exit press Ctrl+C")
    channel.start_consuming()



if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

准备就绪,我们依次启动receive.py、sender.py,运行结果如下:


sender.png
receive.png

上面我们是先启动receive,等待着,然后再启动sender.如果我们先启动sender,再启动receive,打印也还是同上,只是不是发一个取一个了,而是发5个,取5个。也就是只要发送出去的,存在队列里,没有人来取的话,就在队列里等着别人来取。
当然,有时我们可能还不止一个消费者,


多个消费者

我们可以开启2个receive同时接收消息,那么它们会依次取出队列的消息,不重复。我们发送10个消息,打印如下:

send:
[X] Sent 'Hello World!0
[X] Sent 'Hello World!1
[X] Sent 'Hello World!2
[X] Sent 'Hello World!3
[X] Sent 'Hello World!4
[X] Sent 'Hello World!5
[X] Sent 'Hello World!6
[X] Sent 'Hello World!7
[X] Sent 'Hello World!8
[X] Sent 'Hello World!9

receive1:
[X] Received b'Hello World!0'
[X] Received b'Hello World!2'
[X] Received b'Hello World!4'
[X] Received b'Hello World!6'
[X] Received b'Hello World!8'

receive2:
[X] Received b'Hello World!1'
[X] Received b'Hello World!3'
[X] Received b'Hello World!5'
[X] Received b'Hello World!7'
[X] Received b'Hello World!9'

好了,以上我们就完成了一个简单的消息队列的使用了!
接下来,我们再看两个参数:auto_ackx-single-active-consumer

auto_ack:自动确认消息标志。默认为False。上面我们设置为True,实际上就是让它在收到消息后给rabbitMQ发送一个消息确认消息,通知它我们已经收到消息了,它可以将消息从消息队列中删除了。这样我们如果receive如果中断,这些已经被处理的消息就不在队列中了。
上面的例子中,我们的回调函数很简单,只是打印出消息。但是大部分情况下,我们收到消息后,都需要做一些处理,甚至有些是耗时的密集型处理,可能需要稍等几秒。这时我们就不能使用auto_ask了,因为万一该任务突然中断,该消息还没处理完,我们就直接将其从消息队列中删掉了,我们就没办法完整地完成这条消息的处理了。我们不想丢掉任何一条消息。我们只想在我们已经处理完该条消息了之后再给rabbitMQ发送确认消息。这时,就需要用到basic_ack。在消息处理完成之后,调用:ch.basic_ack(delivery_tag=method.delivery_tag),手动发送确认消息。

如果我们忘记调用basic_ack,消息就会一直存在消息队列中吗?那倒也不是,rabbitMQ有一个默认的消息确认超时时间:30分钟。超过30分钟未收到确认,它就会因PRECONDITION_FAILED而异常关闭channel,然后再这个channel上的所有消费者,都将重新队列。
这个时间在rabbitmq.conf可定义:

# 30 minutes in milliseconds
consumer_timeout = 1800000

同时你还可以在advanced.config中完全禁用这个超时时间,只是不推荐使用

%% advanced.config
[
  {rabbit, [
    {consumer_timeout, undefined}
  ]}
].

x-single-active-consumer:单个激活状态的消费者。
这个是在声明队列的时候的参数,我们可以通过arguments参数,将x-single-active-consumer设置为True:

  arguments = {"x-single-active-consumer":True}
  channel.queue_declare(queue='hello1',arguments=arguments)

一旦将x-single-active-consumer设置为True,则这个队列只允许存在一个有效的消费者消费消息。上面的例子中,如果我们设置了这个参数,又启动了2个receive,则只有一个receive可以取到消息,它会取到所有的消息。
tips:如果一个队列已经创建为非x-single-active-consumer的,而你想更改其为x-single-active-consumer,上面的代码是行不通的,会报错,会提示你,声明的队列的和server上的队列不一致。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容