定义:
- MQ(消息队列)是一种在不同服务之间的通信技术,通过在消息队列中存储消息,生产者和消费者可以解耦,分别进行异步的发送和接收消息。
- MQ在分布式系统、微服务架构和异步处理场景中应用广泛。
用途:
- 异步处理:发送方可以立即返回,无需等待接收方处理完毕。接收方可以在方便时处理消息,从而提高系统响应速度和吞吐量。
实践:
- 场景:用户购物下单场景,下单后,订单服务会把订单信息通过mq发送给其他服务,比如库存服务,让其对相应商品进行数量删减。
- 真实流程:
1. 订单服务把订单详情发到mq,会立即向用户返回订单确认信息,包含订单ID、订单状态
2. 此时订单服务除了要发订单详情给mq,也指定了一个回复队列,这个回复队列是订单服务用来接收库存服务处理结果的
3. 库存服务在处理完消息后,将结果发到订单服务指定的回复队列
4. 订单服务监听回复队列,接收库存服务的处理结果
5. 订单服务更新订单状态
订单服务在发送订单创建消息时指定了一个回复队列,库存服务在处理完成后将处理结果发送到这个回复队列。订单服务监听该回复队列,接收到库存服务的处理结果后进行相应操作。这种方法可以确保订单服务能够正确地接收到库存服务的处理结果,实现服务间的可靠通信,相关伪代码如下:
- 订单服务发送消息(带上回复队列)
import pika
import json
def send_order_created_message(order_data):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列(与库存服务保持一致)
channel.queue_declare(queue='order_created')
# 声明回复队列
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue
# 设置回调队列属性
properties = pika.BasicProperties(reply_to=callback_queue)
# 发送消息到指定队列
channel.basic_publish(exchange='',
routing_key='order_created',
body=json.dumps(order_data),
properties=properties)
print(f" [x] Sent {order_data} with callback queue {callback_queue}")
connection.close()
# 示例消息
order_data = {
"order_id": "123456",
"user_id": "user_001",
"amount": 99.99,
"product_id": "product_001"
}
send_order_created_message(order_data)
- 库存服务处理消息并回复订单服务
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_created')
def handle_order_created(ch, method, properties, body):
order_data = json.loads(body)
order_id = order_data['order_id']
# 模拟库存处理逻辑
print(f" [x] Processing Order Created message: {order_data}")
# 假设处理成功
response = {
"order_id": order_id,
"status": "success",
"message": "Inventory updated successfully"
}
# 发送处理结果到回调队列
ch.basic_publish(exchange='',
routing_key=properties.reply_to,
body=json.dumps(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='order_created',
on_message_callback=handle_order_created,
auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
- 订单服务监听回复队列
import pika
import json
def on_response(ch, method, properties, body):
response = json.loads(body)
print(f" [x] Received response: {response}")
# 根据响应执行相应操作,例如更新订单状态等
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明回复队列
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue
channel.basic_consume(queue=callback_queue,
on_message_callback=on_response,
auto_ack=True)
print(f' [*] Waiting for responses on {callback_queue}. To exit press CTRL+C')
# 示例:发送订单创建消息
order_data = {
"order_id": "123456",
"user_id": "user_001",
"amount": 99.99,
"product_id": "product_001"
}
send_order_created_message(order_data)
# 开始监听回复队列
channel.start_consuming()
总结
通过使用消息队列,订单服务和库存服务可以实现解耦和异步通信:
- 订单服务:在用户下单时,将订单详情发送到消息队列。
- 消息队列:充当中间层,存储并转发消息。
- 库存服务:从消息队列中接收订单详情,并据此更新库存。
这种方式不仅提高了系统的响应速度和可扩展性,还增强了系统的可靠性和灵活性。在微服务架构中,消息队列是实现服务之间松耦合通信的重要工具。
如何测试
通过研发要到发送到mq的消息内容,常见的是json格式,还需要mq服务器地址信息、MQ服务器账号信息
- 具体的测试方法(测试mq幂等性)
1. 配置生产者函数:修改 send_order_created_message 函数以使用远程MQ服务器的连接信息。
2. 配置消费者函数:确保消费者函数使用同样的连接信息,能够连接到远程MQ服务器并处理消息。
3. 测试脚本:编写测试脚本,发送重复消息并验证消费者只处理一次。
- 相关伪代码
配置生产者函数
import pika
import json
def send_order_created_message(order_data, times=1, mq_host='remote-mq-server', mq_port=5672, mq_user='user', mq_password='password', queue_name='order_created'):
credentials = pika.PlainCredentials(mq_user, mq_password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue=queue_name)
for _ in range(times):
channel.basic_publish(exchange='',
routing_key=queue_name,
body=json.dumps(order_data))
connection.close()
print(f"Sent {times} messages to the queue {queue_name}")
配置消费者函数
import pika
import json
processed_orders = set()
def handle_order_created(ch, method, properties, body):
order = json.loads(body)
order_id = order['order_id']
if order_id in processed_orders:
print(f"Order {order_id} already processed, skipping.")
else:
print(f"Processing order: {order}")
# 处理订单逻辑
processed_orders.add(order_id)
# 确认消息已被处理
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_consumer(mq_host='remote-mq-server', mq_port=5672, mq_user='user', mq_password='password', queue_name='order_created'):
credentials = pika.PlainCredentials(mq_user, mq_password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_consume(queue=queue_name,
on_message_callback=handle_order_created,
auto_ack=False)
print('Waiting for orders. To exit press CTRL+C')
channel.start_consuming()
测试脚本
# 示例订单数据
order = {
'order_id': '12345',
'user_id': '67890',
'items': [
{'item_id': 'abc123', 'quantity': 2},
{'item_id': 'def456', 'quantity': 1}
],
'total_amount': 150.00
}
# 发送10次相同的消息
send_order_created_message(order, times=10, mq_host='your-mq-server', mq_port=5672, mq_user='your-username', mq_password='your-password', queue_name='order_created')
# 启动消费者,验证只处理一次
start_consumer(mq_host='your-mq-server', mq_port=5672, mq_user='your-username', mq_password='your-password', queue_name='order_created')
步骤说明
1. 配置生产者函数:在 send_order_created_message 函数中,通过 mq_host、mq_port、mq_user 和 mq_password 参数传递MQ服务器的连接信息。
2. 配置消费者函数:在 start_consumer 函数中,通过相同的参数连接到远程MQ服务器。
3. 运行测试:
运行测试脚本的发送部分,使用 send_order_created_message 函数发送10次相同的消息。
运行测试脚本的消费部分,使用 start_consumer 函数启动消费者,验证只处理一条消息。