顺序消息指的是生产者投递消息的顺序与消息消费的顺序一致。某个Topic上所有消息都是顺序的称为全局顺序消息,如果是具有某个相同业务ID的一组消息保证其顺序性则成为部分顺序消息。
并发消费与顺序消费
当使用DefaultMQPushConsumer
做消息消费时,需要通过registerMessageListener(MessageListener)
方法来注册消息的监听器来编写如何处理消息的业务逻辑。MessageListener
接口有两个子接口MessageListenerConcurrently
和MessageListenerOrderly
:
-
并发消费,也就是
MessageListenerConcurrently
不保证消息顺序,消费的顺序跟生产者投递顺序无关,消息消费最大并发度为消费者线程池的大小。 - 全局顺序消费,只能指定该Topic只有一个queue
-
部分顺序消费,生产投递时确保具有相同业务ID的消息发往相同的queue,消费侧多线程去从该Topic下的多个queue拉取进行消费,但
MessageListenerOrderly
会使用锁来确保多线程从同一个queue的消费处理是串行的。
实际场景中更多的是上面所说的部分顺序消费需求,下面来看看如何实现。
部分顺序代码实现
上文中提到了,部分顺序消息就是需要生产端与消费端配合,生产投递时使用MessageQueueSelector
将相同业务ID的消息投递到同一个queue里,消费端使用MessageListenerOrderly
使得多个消费者线程能够串行顺序的从queue中消费。部分顺序消息下、topic消息消费的最大并行度取决于该topic有多少个queue,每个queue的消费在多个线程中是串行的。
例子代码:
生产端:
/**
* 顺序消息投递
* 按照EventMessage中的msgId作为业务ID,
* 业务ID对topic下queue数量取模选定投递哪个queue
* 确保相同业务ID的消息都投递到同一个queue,从而使得该一组消息有序性
* */
@Slf4j
@Component
public class OrderlyMsgProducer {
@Value("${rocketmq.url}")
private String mqurl;
@Value("${rocketmq.accessKey}")
private String accessKey;
@Value("${rocketmq.secretKey}")
private String secretKey;
@Value("${rocketmq.producergroup.name}")
private String producerGroupName;
private DefaultMQProducer producer;
public SendResult publishOrderly(EventMessage eventMsg) {
try {
Message msg = new Message(eventMsg.getTopic(), eventMsg.getTag(), eventMsg.getMsgId(), JSON.toJSONString(eventMsg).getBytes("utf-8"));
SendResult result = producer.send(msg,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
for(MessageQueue queue : mqs) {
log.info(queue.toString());
}
log.info("Topic {} 共有{}个MessageQueue", eventMsg.getTopic(), mqs.size());
int id = Integer.parseInt((String)arg);
int mqNum = id % mqs.size();
log.info("业务ID={}消息投递到{}号Queue", id, mqNum);
return mqs.get(mqNum);
}},
eventMsg.getMsgId());
return result;
} catch (Exception e) {
log.error("顺序消息发送失败" + e.getMessage(), e);
e.printStackTrace();
}
return null;
}
@PostConstruct
public void init() {
producer = new DefaultMQProducer(producerGroupName + "-orderly", getAclRPCHook());
producer.setNamesrvAddr(mqurl);
try {
producer.start();
log.info("RocketMQ客户端顺序producer初始化...");
} catch (MQClientException e) {
e.printStackTrace();
}
}
@PreDestroy
public void shutdown() {
if(producer != null)
producer.shutdown();
}
private RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
}
消费端:
@Slf4j
@Component
public class OrderlyConsumer {
@Value("${rocketmq.url}")
private String mqurl;
@Value("${rocketmq.accessKey}")
private String accessKey;
@Value("${rocketmq.secretKey}")
private String secretKey;
@Value("${rocketmq.consumeThreadCorePoolSize:20}")
private int consumeThreadCorePoolSize;
@PostConstruct
public void init() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testapp-orderlyconsumer-group", getAclRPCHook(),
new AllocateMessageQueueAveragely());
try {
consumer.setNamesrvAddr(mqurl);
consumer.setConsumeThreadMin(consumeThreadCorePoolSize);
consumer.setConsumeThreadMax(consumeThreadCorePoolSize);
consumer.setPullBatchSize(32); // 一次长轮询最多从mq里拿多少个消息,默认32
consumer.setConsumeMessageBatchMaxSize(1);// 批量最多消费的消息、也即List<MessageExt> msgs的大小,默认1
consumer.subscribe("test-orderly-topic", "douchuzi");
//指定消息在同一个queue上串行顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
MessageExt msgExt = msgs.get(0);
String msgContent;
try {
msgContent = new String(msgExt.getBody(), "UTF-8");
log.info("收到消息{}", msgContent);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
} catch (Exception e) {
log.error("注册消费者出错" + e.getMessage(), e);
}
}
// Access Control List控制
private RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
}
测试:
for(int i=0; i<10; i++) {
EventMessage eventMsg = new EventMessage();
eventMsg.setTopic("test-orderly-topic");
eventMsg.setTag("douchuzi");
eventMsg.setMsgId("456");
eventMsg.setProducerGroup("testapp-producer-group-orderly");
eventMsg.setMsgBody("+" + i);
eventMsg.setPublishTime(LocalDateTime.now().toString());
SendResult result = orderlyMsgProducer.publishOrderly(eventMsg);
log.info("消息以发送,消息ID:{}, 消息发送状态:{}", result.getMsgId(), result.getSendStatus());
}
执行结果:
2022-02-02 22:08:31.149 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=0]
2022-02-02 22:08:31.150 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=1]
2022-02-02 22:08:31.151 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=2]
2022-02-02 22:08:31.151 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=3]
2022-02-02 22:08:31.152 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : Topic test-orderly-topic 共有4个MessageQueue
2022-02-02 22:08:31.152 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : 业务ID=456消息投递到0号Queue
2022-02-02 22:08:33.543 INFO 13256 --- [nio-8080-exec-2] com.wangan.controller.WanganController : 消息以发送,消息ID:7F00000133C84E0E2F2A09E6B0BE0000, 消息发送状态:SEND_OK
。。。。。。
。。。。。。
2022-02-02 22:08:33.757 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=0]
2022-02-02 22:08:33.757 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=1]
2022-02-02 22:08:33.757 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=2]
2022-02-02 22:08:33.757 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=3]
2022-02-02 22:08:33.758 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : Topic test-orderly-topic 共有4个MessageQueue
2022-02-02 22:08:33.758 INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer : 业务ID=456消息投递到0号Queue
2022-02-02 22:08:33.774 INFO 13256 --- [nio-8080-exec-2] com.wangan.controller.WanganController : 消息以发送,消息ID:7F00000133C84E0E2F2A09E6B1DE0009, 消息发送状态:SEND_OK
2022-02-02 22:08:51.130 INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer : 收到消息{"msgBody":"+0","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:31.088","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137 INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer : 收到消息{"msgBody":"+1","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.544","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137 INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer : 收到消息{"msgBody":"+2","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.576","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137 INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer : 收到消息{"msgBody":"+3","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.597","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137 INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer : 收到消息{"msgBody":"+4","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.628","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137 INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer : 收到消息{"msgBody":"+5","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.658","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137 INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer : 收到消息{"msgBody":"+6","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.681","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137 INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer : 收到消息{"msgBody":"+7","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.704","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137 INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer : 收到消息{"msgBody":"+8","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.727","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137 INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer : 收到消息{"msgBody":"+9","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.756","tag":"douchuzi","topic":"test-orderly-topic"}
可见连续投递的10条消息最后在消费端是按照顺序消费处理的。