需求:在topic工作模式下,消费者(接收端)仍可以接收消费者启动前,发送端发送的消息。
分析:在节点上,只需要持久化一个queue,这样就可以无论发送端什么时候发送消息,queue都可以接收到,接收端也能消费。
(集群普通模式下,2个节点的queue不是镜像的,需要镜像队列使得只要其中一个节点在运行,就能发送消息,不会丢失)
1.消息以及队列持久化
前提:在服务器上创建一个持久化的消息队列(镜像队列查看 https://www.jianshu.com/p/a10dfc1a0ffb 最后)
发送端代码
import pika,json
host ='172.24.154.154'
count =1
while True:
if count >3:
break
try:
credentials = pika.PlainCredentials('chenfw', 'chenfw123')
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,
port=5672,
credentials=credentials
))
except pika.exceptions.AMQPConnectionErroras e:
host ='172.24.154.155'
count +=1
else:
break
channel = connection.channel()
# 申明交换机
channel.exchange_declare(exchange='activiti', exchange_type='direct', durable=True)
message = {'no':'否', 'yes':'是'}
# 发布 信息 exchange routing_key--指定管道 body--消息内容
channel.basic_publish(exchange='activiti', routing_key='activiti.to.biying', body=json.dumps(message, ensure_ascii=False),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print(" [x] Sent %r:%r" % ('activiti send message', message))
connection.close()
接收端代码
# coding:utf-8
"""
CreateBy: chenfw
DATE: 2019/7/25
"""
import pika
import sys
import json
credentials = pika.PlainCredentials('chenfw', 'chenfw123')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.24.154.154',
port=5672,
credentials=credentials
))
channel = connection.channel()
# 申明交换机 durable 交换机持久化
channel.exchange_declare(exchange_type='direct', exchange='activiti', durable=True)
# durable=True 队列持久化
channel.queue_declare('activitiToBiying', durable=True)
channel.queue_bind(exchange='activiti',
queue='activitiToBiying',
routing_key='activiti.to.biying')
def callback(ch, method, properties, body):
# print(" [x] %r:%r" % (method.routing_key, body,))
ss =str(body, encoding='utf-8')
print(ss)
print(json.loads(ss))
channel.basic_consume(on_message_callback=callback,
queue='activitiToBiying',
auto_ack=True)
channel.start_consuming()