上一篇 <<<RocketMQ如何动态扩容和缩容
下一篇 >>>RocketMQ单机版本安装
1.基本实现思路
a.生产者向我们的Broker(MQ服务器端)发送我们派单消息设置为半消息,该消息不可以被消费者消费。
b.在执行我们的本地的事务,将本地执行事务结果提交或者回滚告诉Broker
c.Broker获取本地事务的结果如果是已提交的话,将该半消息设置为允许被消费者消费,如果本地事务执行失败的情况下,将该半消息直接从Broker中移除
d.如果我们的本地事务没有将结果及时通知给我们的Broker,这时候我们Broker会主动定时(默认60s)查询本地事务结果
e.本地事务结果实际上就是一个回调方法,根据自己业务场景封装本地事务结果
事务回查的时间次数等配置在broker里
2.代码示例
2.1 生产者消息发送
/**
* 生产者消息发送
*/
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public String sendOrder() {
//数据封装
String orderId = (new Random().nextInt(1000)) +"";
UserEntity entity = new UserEntity();
entity.setId(Long.valueOf(orderId));
entity.setAge(12);
entity.setName("张三");
String mm = JSON.toJSONString(entity);
//消息封装
MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(mm);
stringMessageBuilder.setHeader("msg",mm);
Message msg = stringMessageBuilder.build();
//发送半消息
rocketMQTemplate.sendMessageInTransaction("tt0103","tpc0103",msg,null);
return orderId;
}
}
2.2 生产者监听程序
/**
* 生产者监听程序
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tt0103")
public class SyncProducerListener implements RocketMQLocalTransactionListener {
@Autowired
private TransationalUtils transationalUtils;
@Autowired
private OrderMapper orderMapper;
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
TransactionStatus beginStatus = null;
try {
beginStatus = transationalUtils.begin();
MessageHeaders headers = message.getHeaders();
String objMsg = (String) headers.get("msg");
if (StringUtils.isEmpty(objMsg)) {
return RocketMQLocalTransactionState.ROLLBACK;
}
UserEntity orderEntity = JSONObject.parseObject(objMsg, UserEntity.class);
int result = orderMapper.addOrder(orderEntity);
if (result > 0) {
transationalUtils.commit(beginStatus);
}
log.info("【本地业务执行完毕】 msg:{}, Object:{}", message, o);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
log.error("【执行本地业务异常】 exception message:{}", e.getMessage());
if (beginStatus != null) {
transationalUtils.rollback(beginStatus);
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* rocketmq主动查询本地事务提交情况
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
log.info("【执行检查任务】");
MessageHeaders headers = message.getHeaders();
String objMsg = (String) headers.get("msg");
if (StringUtils.isEmpty(objMsg)) {
return RocketMQLocalTransactionState.UNKNOWN;
}
UserEntity orderEntity = JSONObject.parseObject(objMsg, UserEntity.class);
Long orderId = orderEntity.getId();
UserEntity orderDbEntity = orderMapper.findOrderId(orderId);
if (orderDbEntity == null) {
return RocketMQLocalTransactionState.UNKNOWN;
}
return RocketMQLocalTransactionState.COMMIT;
}
}
2.3消费者消费
/**
* 消费者消费
*/
@Service
@RocketMQMessageListener(topic = "tpc0103", consumerGroup = "cpc0103")
public class OrdeConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
UserEntity orderEntity = JSONObject.parseObject(msg, UserEntity.class);
System.out.println(orderEntity.toString());
}
}
2.4异常情况
报错:The producer group has been created before, specify another name please.
producer.setInstanceName(producer.buildMQClientId());
推荐阅读:
<<<消息中间件的核心思想
<<<消息中间件常见问题汇总
<<<基于Netty简单手写消息中间件思路
<<<消息队列常用名词与中间件对比
<<<Rabbitmq基础知识
<<<Rabbitmq示例之点对点简单队列
<<<Rabbitmq示例之工作(公平)队列
<<<Rabbitmq示例之发布订阅模式
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq示例之RPC模式
<<<Rabbitmq队列模式总结
<<<Rabbitmq如何保证消息不丢失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保证幂等性
<<<Rabbitmq的重试策略
<<<Rabbitmq通过死信队列实现过期监听
<<<Rabbitmq解决分布式事务思路
<<<Rabbitmq解决分布式事务demo
<<<Rabbitmq环境安装
<<<Kafka中的专业术语都有哪些
<<<Kafka的设计原理介绍
<<<Kafka集群如何实现相互感知
<<<Kafka如何实现分区及指定分区消费
<<<Kafka如何保证消息顺序消费
<<<Kafka如何保证高吞吐量
<<<Kafka集群环境搭建
<<<RocketMQ架构原理
<<<RocketMQ、RabbitMQ和Kafka的对比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保证顺序消费demo
<<<RocketMQ如何动态扩容和缩容
<<<RocketMQ单机版本安装
<<<RocketMQ集群环境程序启用相关知识点
<<<RocketMQ单机做主备实操
<<<RocketMQ所有配置说明