MQ原理及如何测试

定义:
  • MQ(消息队列)是一种在不同服务之间的通信技术,通过在消息队列中存储消息,生产者和消费者可以解耦,分别进行异步的发送和接收消息。
  • MQ在分布式系统、微服务架构和异步处理场景中应用广泛。
用途:
  • 异步处理:发送方可以立即返回,无需等待接收方处理完毕。接收方可以在方便时处理消息,从而提高系统响应速度和吞吐量。
实践:
  • 场景:用户购物下单场景,下单后,订单服务会把订单信息通过mq发送给其他服务,比如库存服务,让其对相应商品进行数量删减。
  • 真实流程:
1. 订单服务把订单详情发到mq,会立即向用户返回订单确认信息,包含订单ID、订单状态
2. 此时订单服务除了要发订单详情给mq,也指定了一个回复队列,这个回复队列是订单服务用来接收库存服务处理结果的
3. 库存服务在处理完消息后,将结果发到订单服务指定的回复队列
4. 订单服务监听回复队列,接收库存服务的处理结果
5. 订单服务更新订单状态

订单服务在发送订单创建消息时指定了一个回复队列,库存服务在处理完成后将处理结果发送到这个回复队列。订单服务监听该回复队列,接收到库存服务的处理结果后进行相应操作。这种方法可以确保订单服务能够正确地接收到库存服务的处理结果,实现服务间的可靠通信,相关伪代码如下:
  • 订单服务发送消息(带上回复队列)
import pika
import json

def send_order_created_message(order_data):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # 声明队列(与库存服务保持一致)
    channel.queue_declare(queue='order_created')
    
    # 声明回复队列
    result = channel.queue_declare(queue='', exclusive=True)
    callback_queue = result.method.queue
    
    # 设置回调队列属性
    properties = pika.BasicProperties(reply_to=callback_queue)
    
    # 发送消息到指定队列
    channel.basic_publish(exchange='',
                          routing_key='order_created',
                          body=json.dumps(order_data),
                          properties=properties)
    
    print(f" [x] Sent {order_data} with callback queue {callback_queue}")
    connection.close()

# 示例消息
order_data = {
    "order_id": "123456",
    "user_id": "user_001",
    "amount": 99.99,
    "product_id": "product_001"
}

send_order_created_message(order_data)
  • 库存服务处理消息并回复订单服务
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='order_created')

def handle_order_created(ch, method, properties, body):
    order_data = json.loads(body)
    order_id = order_data['order_id']
    
    # 模拟库存处理逻辑
    print(f" [x] Processing Order Created message: {order_data}")
    # 假设处理成功
    response = {
        "order_id": order_id,
        "status": "success",
        "message": "Inventory updated successfully"
    }
    
    # 发送处理结果到回调队列
    ch.basic_publish(exchange='',
                     routing_key=properties.reply_to,
                     body=json.dumps(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='order_created',
                      on_message_callback=handle_order_created,
                      auto_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
  • 订单服务监听回复队列
import pika
import json

def on_response(ch, method, properties, body):
    response = json.loads(body)
    print(f" [x] Received response: {response}")
    # 根据响应执行相应操作,例如更新订单状态等

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明回复队列
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_consume(queue=callback_queue,
                      on_message_callback=on_response,
                      auto_ack=True)

print(f' [*] Waiting for responses on {callback_queue}. To exit press CTRL+C')

# 示例:发送订单创建消息
order_data = {
    "order_id": "123456",
    "user_id": "user_001",
    "amount": 99.99,
    "product_id": "product_001"
}
send_order_created_message(order_data)

# 开始监听回复队列
channel.start_consuming()
总结

通过使用消息队列,订单服务和库存服务可以实现解耦和异步通信:

  • 订单服务:在用户下单时,将订单详情发送到消息队列。
  • 消息队列:充当中间层,存储并转发消息。
  • 库存服务:从消息队列中接收订单详情,并据此更新库存。

这种方式不仅提高了系统的响应速度和可扩展性,还增强了系统的可靠性和灵活性。在微服务架构中,消息队列是实现服务之间松耦合通信的重要工具。

如何测试

通过研发要到发送到mq的消息内容,常见的是json格式,还需要mq服务器地址信息、MQ服务器账号信息

  • 具体的测试方法(测试mq幂等性)
1. 配置生产者函数:修改 send_order_created_message 函数以使用远程MQ服务器的连接信息。
2. 配置消费者函数:确保消费者函数使用同样的连接信息,能够连接到远程MQ服务器并处理消息。
3. 测试脚本:编写测试脚本,发送重复消息并验证消费者只处理一次。
  • 相关伪代码
    配置生产者函数
import pika
import json

def send_order_created_message(order_data, times=1, mq_host='remote-mq-server', mq_port=5672, mq_user='user', mq_password='password', queue_name='order_created'):
    credentials = pika.PlainCredentials(mq_user, mq_password)
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))
    channel = connection.channel()
    
    channel.queue_declare(queue=queue_name)

    for _ in range(times):
        channel.basic_publish(exchange='',
                              routing_key=queue_name,
                              body=json.dumps(order_data))
    
    connection.close()
    print(f"Sent {times} messages to the queue {queue_name}")

配置消费者函数

import pika
import json

processed_orders = set()

def handle_order_created(ch, method, properties, body):
    order = json.loads(body)
    order_id = order['order_id']
    
    if order_id in processed_orders:
        print(f"Order {order_id} already processed, skipping.")
    else:
        print(f"Processing order: {order}")
        # 处理订单逻辑
        processed_orders.add(order_id)
        # 确认消息已被处理
        ch.basic_ack(delivery_tag=method.delivery_tag)

def start_consumer(mq_host='remote-mq-server', mq_port=5672, mq_user='user', mq_password='password', queue_name='order_created'):
    credentials = pika.PlainCredentials(mq_user, mq_password)
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))
    channel = connection.channel()

    channel.queue_declare(queue=queue_name)

    channel.basic_consume(queue=queue_name,
                          on_message_callback=handle_order_created,
                          auto_ack=False)

    print('Waiting for orders. To exit press CTRL+C')
    channel.start_consuming()

测试脚本

# 示例订单数据
order = {
    'order_id': '12345',
    'user_id': '67890',
    'items': [
        {'item_id': 'abc123', 'quantity': 2},
        {'item_id': 'def456', 'quantity': 1}
    ],
    'total_amount': 150.00
}

# 发送10次相同的消息
send_order_created_message(order, times=10, mq_host='your-mq-server', mq_port=5672, mq_user='your-username', mq_password='your-password', queue_name='order_created')

# 启动消费者,验证只处理一次
start_consumer(mq_host='your-mq-server', mq_port=5672, mq_user='your-username', mq_password='your-password', queue_name='order_created')

步骤说明

1. 配置生产者函数:在 send_order_created_message 函数中,通过 mq_host、mq_port、mq_user 和 mq_password 参数传递MQ服务器的连接信息。
2. 配置消费者函数:在 start_consumer 函数中,通过相同的参数连接到远程MQ服务器。
3. 运行测试:
运行测试脚本的发送部分,使用 send_order_created_message 函数发送10次相同的消息。
运行测试脚本的消费部分,使用 start_consumer 函数启动消费者,验证只处理一条消息。
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容