Python 消息中间件RabbitMQ使用

介绍

rabbitmq是基于Erlang语言编写的一种消息队列中间件,具体的内容网上有很多这里就不赘述了,本文主要介绍一下在python当中基于第三方库pika对rabbitmq的简单使用

安装

服务端
ubuntu安装参考

https://www.cnblogs.com/vipstone/p/9184314.html

centos参考

https://blog.csdn.net/zhuzhezhuzhe1/article/details/80464291

客户端
pip install pika

场景

  • 任务队列
  • 发布订阅内容
  • ...

生产者-消费者模式

在消息队列当中,最简单的就是生产者消费者模式,即生产者发布一条消息,一个或者多个消费者在监听等待,最终发布的消息被其中一个消费者给取走执行的模式,下面是简单的示例:

生产者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
# rabbitmq队列账号密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
# 连接配置:主要包括host、port(默认15672)、credentials(登录用户名密码)
channel = connection.channel()
# 创建一个连接通道
channel.queue_declare(queue='aaa')
# 声明一个名为aaa的队列
channel.basic_publish(exchange='',
                      routing_key='aaa',
                      body='this is a msg!')
# 往aaa队列发送一条消息
connection.close()
#关闭连接
消费者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
# rabbitmq队列账号密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
# 连接配置:主要包括host、port(默认5672)、credentials(登录用户名密码)
channel = connection.channel()
# 创建一个连接通道
channel.queue_declare(queue='aaa')
# 声明一个名为aaa的队列
def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))
 
channel.basic_consume('aaa', callback, auto_ack=True)
# 从aaa队列中取出一条消息,并执行对应回调,第三个参数代表取到消息后自动回复执行完成,生产者会将该任务消息删除
# 在旧版里传参顺序和参数名可能有所不同(旧版里是:callback, queue='aaa', no_ack=True)
print('start...')
channel.start_consuming()
# 开启循环监听消息队列

此时如果打开多个消费者,那么可以发现生产者的发送的消息队列将会被消费者按顺序取走

生产者消费者模式-常用配置

任务完成回复

auto_ack参数可以配置任务是否需要回复,默认是False,即任务被取走之后,只有消费者在回调当中执行了ch.basic_ack(delivery_tag=method.delivery_tag)方法以后,代表任务执行完成,此时生产者才会把任务从消息队列当中删除,若消费者没能在关闭前执行上面那句方法,那么别的消费者将会在之后取走该任务去执行,直到生产者接收到执行完成的指令为止。如果auto_ack值为True,那么当任务被取走之后,生产者将直接把队列中的任务删除。举例:

# 消费者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')

def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))
    ch.basic_ack(delivery_tag=method.delivery_tag)
    # 回复任务完成
 
channel.basic_consume('aaa', callback, auto_ack=False)
# 设置auto_ack=False,此时消费者必须提供任务完成的回复
print('start...')
channel.start_consuming()
消息队列持久化

在队列声明时,可以通过参数durable配置是否需要持久化,默认为False,即不需要持久化,此时如果服务端挂了,那么消息队列的内容将会丢失。如果配置持久化,那么首先需要在声明当中设置durable=True,然后在发布时也配置分发模式为持久化分发,举例:

# 生产者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='bbb', durable=True)
# 声明一个消息持久化的队列,因为之前已经创建了aaa队列,并且不是持久化的队列,所以这里新建一个不存在的队列
channel.basic_publish(exchange='',
                      routing_key='bbb',
                      body="this is a durable msg!", 
                      properties=pika.BasicProperties(
                          delivery_mode=2,
                          # 配置消息持久化
                      ))
connection.close()
闲置消费

默认是按照客户端的顺序一个个循环派发任务的,但要是第一个客户端没执行完,而下一个客户端已经执行完了,此时如果还把任务派发给第一个就有些不好了,所以需要将任务派发给闲置的客户端,类似于Nginx的负载均衡,实现只需要在消费者当中加入一句代码:channel.basic_qos(prefetch_count=1),举例:

# 消费者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')

def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
# 设置闲置消费
channel.basic_consume('aaa', callback, auto_ack=False)
print('start...')
channel.start_consuming()

发布订阅模式-fanout

前面介绍的都是生产者发布一条任务消息,然后一个或者多个消费者中的其中一个取走这个任务去执行的情况。而有一种场景,如广播、微信公众号的消息推送这种,往往需要将一条消息发布给所有的消费者执行,而在rabbitmq当中就可以通过创建一个exchange交换器来创建和管理多个队列,即一个exchange下有多个队列,并且每个队列对应一个消费者,当有消息的时候,exchange会将消息发送给自己所管理的所有队列,此时需要设置类型为fanout,举例:

发布者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex1', exchange_type='fanout')
# 声明一个名为ex1的交换器,发布类型为集体分发
channel.basic_publish(exchange='ex1',
                      routing_key='',
                      body="everyone on ex1 will get this msg!"
                      )
# 往ex1中所有队列分发消息
connection.close()
订阅者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex1', exchange_type='fanout')
# 声明一个名为ex1的交换器,发布类型为集体分发
result = channel.queue_declare(queue='', exclusive=True)
# 这里不定义名字,通过exclusive=True生成一个名字不重复的队列
queue_name = result.method.queue
# 获取队列名
channel.queue_bind(queue_name, exchange='ex1')
# 绑定队列名和ex1交换器
def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))

channel.basic_consume(queue_name, callback, True)
# 使用随机生成的队列名在ex1交换器下接收消息
print('queue:{} start...'.format(queue_name))
channel.start_consuming()

注:
发布订阅模式和生产消费者模式还有一点不同就是:生产消费者模式当中,生产者产生的消费只要没被取走,那么消息就会一直留着等待被消费者取走;而在发布订阅模式当中,发布者只会发布一次,发布完该消息就会被删除,因此如果订阅者没有在发布者发布时接收到消息,将永远错过接收的机会(就像关注公众号以后,公众号并不会把以前的所有历史推送信息也给你重新再推送一遍一样)

指定发布订阅-direct

对于发布时,可以不给所有队列发送消息,而是指定给哪些队列发送,举例:

发布者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex2', exchange_type='direct')
# 声明一个名为ex2的交换器,发布类型为指定路由分发
channel.basic_publish(exchange='ex2',
                      routing_key='test1',
                      body="only test1 will get this msg!"
                      )
# 往ex2中test1的路由分发消息
channel.basic_publish(exchange='ex2',
                      routing_key='test2',
                      body="only test2 will get this msg!"
                      )
# 往ex2中test2的路由分发消息
connection.close()
订阅者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex2', exchange_type='direct')
# 声明一个名为ex2的交换器,接收类型为指定路由
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(queue_name, exchange='ex2', routing_key='test1')
# 绑定ex2与队列,以及绑定对应的路由
channel.queue_bind(queue_name, exchange='ex2', routing_key='test2')
channel.queue_bind(queue_name, exchange='ex2', routing_key='test3')
# 可以一个队列绑定多个路由
def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))

channel.basic_consume(queue_name, callback, True)
print('queue:{} start...'.format(queue_name))
channel.start_consuming()

模糊匹配发布订阅-topic

在发布订阅时也可以通过模糊匹配路由,当符合匹配规则的路由将会接收到消息,其中常用的通配符有*#*后面能够匹配一个单词,#后面能够匹配一个或多个单词(例如有a.#a.*的匹配规则,那么a.x能被两个规则都匹配到,但a.x.y只能被a.#匹配到),举例:

发布者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex3', exchange_type='topic')
# 声明一个名为ex3的交换器,发布类型为指定模糊匹配路由分发
channel.basic_publish(exchange='ex3',
                      routing_key='test.a',
                      body="test.# or test.* will get this msg!"
                      )
channel.basic_publish(exchange='ex3',
                      routing_key='test.b.c',
                      body="only test.# will get this msg!"
                      )
connection.close()
订阅者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex3', exchange_type='topic')
# 声明一个名为ex3的交换器,发布类型为模糊匹配路由接收
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(queue_name, exchange='ex3', routing_key='test.#')
# 这里因为routing_key='test.#',所以test.a和test.b.c的消息都能接收到

def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))

channel.basic_consume(queue_name, callback, True)
print('queue:{} start...'.format(queue_name))
channel.start_consuming()

RPC

远程过程调用,一个简单的理解就是假如本地要调用一个函数,而服务端已经实现了该函数,那么可以向服务端请求调用该函数,并把返回值返回给本地,具体参考:https://www.cnblogs.com/goldsunshine/p/8665456.html

其他操作

pika模块当中也提供了如删除/解绑队列、删除/解绑交换器等操作,举例:

import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_delete("ex1")
# 删除交换器
channel.queue_delete("aaa")
# 删除队列
connection.close()

更多关于pika操作参考:https://www.cnblogs.com/cwp-bg/p/8426188.html

更多参考

RabbitMQ的六种工作模式
Python RabbitMQ原理和使用场景以及模式

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349