MQ原理及如何测试

定义:
  • MQ(消息队列)是一种在不同服务之间的通信技术,通过在消息队列中存储消息,生产者和消费者可以解耦,分别进行异步的发送和接收消息。
  • MQ在分布式系统、微服务架构和异步处理场景中应用广泛。
用途:
  • 异步处理:发送方可以立即返回,无需等待接收方处理完毕。接收方可以在方便时处理消息,从而提高系统响应速度和吞吐量。
实践:
  • 场景:用户购物下单场景,下单后,订单服务会把订单信息通过mq发送给其他服务,比如库存服务,让其对相应商品进行数量删减。
  • 真实流程:
1. 订单服务把订单详情发到mq,会立即向用户返回订单确认信息,包含订单ID、订单状态
2. 此时订单服务除了要发订单详情给mq,也指定了一个回复队列,这个回复队列是订单服务用来接收库存服务处理结果的
3. 库存服务在处理完消息后,将结果发到订单服务指定的回复队列
4. 订单服务监听回复队列,接收库存服务的处理结果
5. 订单服务更新订单状态

订单服务在发送订单创建消息时指定了一个回复队列,库存服务在处理完成后将处理结果发送到这个回复队列。订单服务监听该回复队列,接收到库存服务的处理结果后进行相应操作。这种方法可以确保订单服务能够正确地接收到库存服务的处理结果,实现服务间的可靠通信,相关伪代码如下:
  • 订单服务发送消息(带上回复队列)
import pika
import json

def send_order_created_message(order_data):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # 声明队列(与库存服务保持一致)
    channel.queue_declare(queue='order_created')
    
    # 声明回复队列
    result = channel.queue_declare(queue='', exclusive=True)
    callback_queue = result.method.queue
    
    # 设置回调队列属性
    properties = pika.BasicProperties(reply_to=callback_queue)
    
    # 发送消息到指定队列
    channel.basic_publish(exchange='',
                          routing_key='order_created',
                          body=json.dumps(order_data),
                          properties=properties)
    
    print(f" [x] Sent {order_data} with callback queue {callback_queue}")
    connection.close()

# 示例消息
order_data = {
    "order_id": "123456",
    "user_id": "user_001",
    "amount": 99.99,
    "product_id": "product_001"
}

send_order_created_message(order_data)
  • 库存服务处理消息并回复订单服务
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='order_created')

def handle_order_created(ch, method, properties, body):
    order_data = json.loads(body)
    order_id = order_data['order_id']
    
    # 模拟库存处理逻辑
    print(f" [x] Processing Order Created message: {order_data}")
    # 假设处理成功
    response = {
        "order_id": order_id,
        "status": "success",
        "message": "Inventory updated successfully"
    }
    
    # 发送处理结果到回调队列
    ch.basic_publish(exchange='',
                     routing_key=properties.reply_to,
                     body=json.dumps(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='order_created',
                      on_message_callback=handle_order_created,
                      auto_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
  • 订单服务监听回复队列
import pika
import json

def on_response(ch, method, properties, body):
    response = json.loads(body)
    print(f" [x] Received response: {response}")
    # 根据响应执行相应操作,例如更新订单状态等

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明回复队列
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_consume(queue=callback_queue,
                      on_message_callback=on_response,
                      auto_ack=True)

print(f' [*] Waiting for responses on {callback_queue}. To exit press CTRL+C')

# 示例:发送订单创建消息
order_data = {
    "order_id": "123456",
    "user_id": "user_001",
    "amount": 99.99,
    "product_id": "product_001"
}
send_order_created_message(order_data)

# 开始监听回复队列
channel.start_consuming()
总结

通过使用消息队列,订单服务和库存服务可以实现解耦和异步通信:

  • 订单服务:在用户下单时,将订单详情发送到消息队列。
  • 消息队列:充当中间层,存储并转发消息。
  • 库存服务:从消息队列中接收订单详情,并据此更新库存。

这种方式不仅提高了系统的响应速度和可扩展性,还增强了系统的可靠性和灵活性。在微服务架构中,消息队列是实现服务之间松耦合通信的重要工具。

如何测试

通过研发要到发送到mq的消息内容,常见的是json格式,还需要mq服务器地址信息、MQ服务器账号信息

  • 具体的测试方法(测试mq幂等性)
1. 配置生产者函数:修改 send_order_created_message 函数以使用远程MQ服务器的连接信息。
2. 配置消费者函数:确保消费者函数使用同样的连接信息,能够连接到远程MQ服务器并处理消息。
3. 测试脚本:编写测试脚本,发送重复消息并验证消费者只处理一次。
  • 相关伪代码
    配置生产者函数
import pika
import json

def send_order_created_message(order_data, times=1, mq_host='remote-mq-server', mq_port=5672, mq_user='user', mq_password='password', queue_name='order_created'):
    credentials = pika.PlainCredentials(mq_user, mq_password)
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))
    channel = connection.channel()
    
    channel.queue_declare(queue=queue_name)

    for _ in range(times):
        channel.basic_publish(exchange='',
                              routing_key=queue_name,
                              body=json.dumps(order_data))
    
    connection.close()
    print(f"Sent {times} messages to the queue {queue_name}")

配置消费者函数

import pika
import json

processed_orders = set()

def handle_order_created(ch, method, properties, body):
    order = json.loads(body)
    order_id = order['order_id']
    
    if order_id in processed_orders:
        print(f"Order {order_id} already processed, skipping.")
    else:
        print(f"Processing order: {order}")
        # 处理订单逻辑
        processed_orders.add(order_id)
        # 确认消息已被处理
        ch.basic_ack(delivery_tag=method.delivery_tag)

def start_consumer(mq_host='remote-mq-server', mq_port=5672, mq_user='user', mq_password='password', queue_name='order_created'):
    credentials = pika.PlainCredentials(mq_user, mq_password)
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))
    channel = connection.channel()

    channel.queue_declare(queue=queue_name)

    channel.basic_consume(queue=queue_name,
                          on_message_callback=handle_order_created,
                          auto_ack=False)

    print('Waiting for orders. To exit press CTRL+C')
    channel.start_consuming()

测试脚本

# 示例订单数据
order = {
    'order_id': '12345',
    'user_id': '67890',
    'items': [
        {'item_id': 'abc123', 'quantity': 2},
        {'item_id': 'def456', 'quantity': 1}
    ],
    'total_amount': 150.00
}

# 发送10次相同的消息
send_order_created_message(order, times=10, mq_host='your-mq-server', mq_port=5672, mq_user='your-username', mq_password='your-password', queue_name='order_created')

# 启动消费者,验证只处理一次
start_consumer(mq_host='your-mq-server', mq_port=5672, mq_user='your-username', mq_password='your-password', queue_name='order_created')

步骤说明

1. 配置生产者函数:在 send_order_created_message 函数中,通过 mq_host、mq_port、mq_user 和 mq_password 参数传递MQ服务器的连接信息。
2. 配置消费者函数:在 start_consumer 函数中,通过相同的参数连接到远程MQ服务器。
3. 运行测试:
运行测试脚本的发送部分,使用 send_order_created_message 函数发送10次相同的消息。
运行测试脚本的消费部分,使用 start_consumer 函数启动消费者,验证只处理一条消息。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,012评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,628评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,653评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,485评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,574评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,590评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,596评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,340评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,794评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,102评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,276评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,940评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,583评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,201评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,441评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,173评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,136评论 2 352

推荐阅读更多精彩内容