RocketMQ 的OrderMessage demo

如何保证消息的顺序消费,只要将一组需要顺序消费的消息发送到同一个broker的同一个队列上,并且消费者采用有序Listener即可。

下面的代码,发送十个订单,每个订单有创建,支付,发货状态;

@RequestMapping("/order_mq")

@ResponseBody

public Result oreder_mq() {

String[] tags = new String[] { "createTag", "payTag", "sendTag" };

for (int orderId = 0; orderId < 10; orderId++) {

for (int type = 0; type < 3; type++) {

Message msg = new Message("orderTopic", tags[type % tags.length], orderId + ":" + type, (orderId + ":" + type).getBytes());

sender.sendOrderMessage(orderId, msg);

}

}

return Result.success("Hello,world"); }

MQSender.java

public void sendOrderMessage(int orderId, Message msg) {

log.info("send message:" + msg);

rocketMQTemplate.asyncSendOrderly("orderTopic", msg, orderId + "", null, 3000);

}

MQReceiver.java需要实现RocketMQPushConsumerLifecycleListener,并且注册MessageListenerOrderly

@Service

@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "my-consumer_orderTopic")

public class MQReceiver1 implements RocketMQListener, RocketMQPushConsumerLifecycleListener {

private static Logger log = LoggerFactory.getLogger(MQReceiver1.class);

public void prepareStart(DefaultMQPushConsumer consumer) { consumer.registerMessageListener(new MessageListenerOrderly(){

public ConsumeOrderlyStatus consumeMessage(

List msgs, ConsumeOrderlyContext context) {

try {

log.info(new String(msgs.get(0).getBody(), "UTF-8"));

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

return ConsumeOrderlyStatus.SUCCESS;

} }); } 

}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容