介绍
rabbitmq是基于Erlang语言编写的一种消息队列中间件,具体的内容网上有很多这里就不赘述了,本文主要介绍一下在python当中基于第三方库pika
对rabbitmq的简单使用
安装
服务端
ubuntu安装参考
https://www.cnblogs.com/vipstone/p/9184314.html
centos参考
https://blog.csdn.net/zhuzhezhuzhe1/article/details/80464291
客户端
pip install pika
场景
- 任务队列
- 发布订阅内容
- ...
生产者-消费者模式
在消息队列当中,最简单的就是生产者消费者模式,即生产者发布一条消息,一个或者多个消费者在监听等待,最终发布的消息被其中一个消费者给取走执行的模式,下面是简单的示例:
生产者
import pika
credentials = pika.PlainCredentials('test', 'test')
# rabbitmq队列账号密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
# 连接配置:主要包括host、port(默认15672)、credentials(登录用户名密码)
channel = connection.channel()
# 创建一个连接通道
channel.queue_declare(queue='aaa')
# 声明一个名为aaa的队列
channel.basic_publish(exchange='',
routing_key='aaa',
body='this is a msg!')
# 往aaa队列发送一条消息
connection.close()
#关闭连接
消费者
import pika
credentials = pika.PlainCredentials('test', 'test')
# rabbitmq队列账号密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
# 连接配置:主要包括host、port(默认5672)、credentials(登录用户名密码)
channel = connection.channel()
# 创建一个连接通道
channel.queue_declare(queue='aaa')
# 声明一个名为aaa的队列
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
channel.basic_consume('aaa', callback, auto_ack=True)
# 从aaa队列中取出一条消息,并执行对应回调,第三个参数代表取到消息后自动回复执行完成,生产者会将该任务消息删除
# 在旧版里传参顺序和参数名可能有所不同(旧版里是:callback, queue='aaa', no_ack=True)
print('start...')
channel.start_consuming()
# 开启循环监听消息队列
此时如果打开多个消费者,那么可以发现生产者的发送的消息队列将会被消费者按顺序取走
生产者消费者模式-常用配置
任务完成回复
auto_ack
参数可以配置任务是否需要回复,默认是False
,即任务被取走之后,只有消费者在回调当中执行了ch.basic_ack(delivery_tag=method.delivery_tag)
方法以后,代表任务执行完成,此时生产者才会把任务从消息队列当中删除,若消费者没能在关闭前执行上面那句方法,那么别的消费者将会在之后取走该任务去执行,直到生产者接收到执行完成的指令为止。如果auto_ack
值为True
,那么当任务被取走之后,生产者将直接把队列中的任务删除。举例:
# 消费者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
ch.basic_ack(delivery_tag=method.delivery_tag)
# 回复任务完成
channel.basic_consume('aaa', callback, auto_ack=False)
# 设置auto_ack=False,此时消费者必须提供任务完成的回复
print('start...')
channel.start_consuming()
消息队列持久化
在队列声明时,可以通过参数durable
配置是否需要持久化,默认为False
,即不需要持久化,此时如果服务端挂了,那么消息队列的内容将会丢失。如果配置持久化,那么首先需要在声明当中设置durable=True
,然后在发布时也配置分发模式为持久化分发,举例:
# 生产者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='bbb', durable=True)
# 声明一个消息持久化的队列,因为之前已经创建了aaa队列,并且不是持久化的队列,所以这里新建一个不存在的队列
channel.basic_publish(exchange='',
routing_key='bbb',
body="this is a durable msg!",
properties=pika.BasicProperties(
delivery_mode=2,
# 配置消息持久化
))
connection.close()
闲置消费
默认是按照客户端的顺序一个个循环派发任务的,但要是第一个客户端没执行完,而下一个客户端已经执行完了,此时如果还把任务派发给第一个就有些不好了,所以需要将任务派发给闲置的客户端,类似于Nginx的负载均衡,实现只需要在消费者当中加入一句代码:channel.basic_qos(prefetch_count=1)
,举例:
# 消费者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
# 设置闲置消费
channel.basic_consume('aaa', callback, auto_ack=False)
print('start...')
channel.start_consuming()
发布订阅模式-fanout
前面介绍的都是生产者发布一条任务消息,然后一个或者多个消费者中的其中一个取走这个任务去执行的情况。而有一种场景,如广播、微信公众号的消息推送这种,往往需要将一条消息发布给所有的消费者执行,而在rabbitmq当中就可以通过创建一个exchange
交换器来创建和管理多个队列,即一个exchange
下有多个队列,并且每个队列对应一个消费者,当有消息的时候,exchange
会将消息发送给自己所管理的所有队列,此时需要设置类型为fanout
,举例:
发布者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex1', exchange_type='fanout')
# 声明一个名为ex1的交换器,发布类型为集体分发
channel.basic_publish(exchange='ex1',
routing_key='',
body="everyone on ex1 will get this msg!"
)
# 往ex1中所有队列分发消息
connection.close()
订阅者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex1', exchange_type='fanout')
# 声明一个名为ex1的交换器,发布类型为集体分发
result = channel.queue_declare(queue='', exclusive=True)
# 这里不定义名字,通过exclusive=True生成一个名字不重复的队列
queue_name = result.method.queue
# 获取队列名
channel.queue_bind(queue_name, exchange='ex1')
# 绑定队列名和ex1交换器
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
channel.basic_consume(queue_name, callback, True)
# 使用随机生成的队列名在ex1交换器下接收消息
print('queue:{} start...'.format(queue_name))
channel.start_consuming()
注:
发布订阅模式和生产消费者模式还有一点不同就是:生产消费者模式当中,生产者产生的消费只要没被取走,那么消息就会一直留着等待被消费者取走;而在发布订阅模式当中,发布者只会发布一次,发布完该消息就会被删除,因此如果订阅者没有在发布者发布时接收到消息,将永远错过接收的机会(就像关注公众号以后,公众号并不会把以前的所有历史推送信息也给你重新再推送一遍一样)
指定发布订阅-direct
对于发布时,可以不给所有队列发送消息,而是指定给哪些队列发送,举例:
发布者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex2', exchange_type='direct')
# 声明一个名为ex2的交换器,发布类型为指定路由分发
channel.basic_publish(exchange='ex2',
routing_key='test1',
body="only test1 will get this msg!"
)
# 往ex2中test1的路由分发消息
channel.basic_publish(exchange='ex2',
routing_key='test2',
body="only test2 will get this msg!"
)
# 往ex2中test2的路由分发消息
connection.close()
订阅者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex2', exchange_type='direct')
# 声明一个名为ex2的交换器,接收类型为指定路由
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(queue_name, exchange='ex2', routing_key='test1')
# 绑定ex2与队列,以及绑定对应的路由
channel.queue_bind(queue_name, exchange='ex2', routing_key='test2')
channel.queue_bind(queue_name, exchange='ex2', routing_key='test3')
# 可以一个队列绑定多个路由
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
channel.basic_consume(queue_name, callback, True)
print('queue:{} start...'.format(queue_name))
channel.start_consuming()
模糊匹配发布订阅-topic
在发布订阅时也可以通过模糊匹配路由,当符合匹配规则的路由将会接收到消息,其中常用的通配符有*
和#
,*
后面能够匹配一个单词,#
后面能够匹配一个或多个单词(例如有a.#
和a.*
的匹配规则,那么a.x
能被两个规则都匹配到,但a.x.y
只能被a.#
匹配到),举例:
发布者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex3', exchange_type='topic')
# 声明一个名为ex3的交换器,发布类型为指定模糊匹配路由分发
channel.basic_publish(exchange='ex3',
routing_key='test.a',
body="test.# or test.* will get this msg!"
)
channel.basic_publish(exchange='ex3',
routing_key='test.b.c',
body="only test.# will get this msg!"
)
connection.close()
订阅者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex3', exchange_type='topic')
# 声明一个名为ex3的交换器,发布类型为模糊匹配路由接收
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(queue_name, exchange='ex3', routing_key='test.#')
# 这里因为routing_key='test.#',所以test.a和test.b.c的消息都能接收到
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
channel.basic_consume(queue_name, callback, True)
print('queue:{} start...'.format(queue_name))
channel.start_consuming()
RPC
远程过程调用,一个简单的理解就是假如本地要调用一个函数,而服务端已经实现了该函数,那么可以向服务端请求调用该函数,并把返回值返回给本地,具体参考:https://www.cnblogs.com/goldsunshine/p/8665456.html
其他操作
在pika
模块当中也提供了如删除/解绑队列、删除/解绑交换器等操作,举例:
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_delete("ex1")
# 删除交换器
channel.queue_delete("aaa")
# 删除队列
connection.close()
更多关于pika操作参考:https://www.cnblogs.com/cwp-bg/p/8426188.html