RocketMQ基于Spring编程模型的消息收发
添加rocketmq-spring-boot-starter等相关依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
......
</dependencies>
RocketMQ Producer
添加配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
#必须指定group
group: test-group
编写producer代码
@Service
@Slf4j
public class ShareService {
@Autowired
private ShareMapper shareMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
public ShareDTO auditById(Integer id, ShareAuditDTO auditDTO){
//1、查询Share是否存在,不存在或者当前的audit_status != NOT_YET就抛异常
Share share = shareMapper.selectByPrimaryKey(id);
//3、如果是PASS,那么就发送消息到rocketmq,让用户中心去消费,并为发布人添加积分
this.rocketMQTemplate.convertAndSend("add-bonus",
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50).build());
return shareDTO;
}
}
RocketMQ Consumer
添加配置
rocketmq:
name-server: 127.0.0.1:9876
编写consumer代码
@Component
@RocketMQMessageListener(topic = "add-bonus",consumerGroup = "consumer-group")
@Slf4j
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
@Autowired
private UserService userService;
@Override
public void onMessage(UserAddBonusMsgDTO message) {
//收到消息后执行的业务。
userService.addBonus(message);
}
}
@Service
@Slf4j
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private BonusEventLogMapper bonusEventLogMapper;
@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO userAddBonusMsgDTO) {
//当收到消息的时候执行的业务
//1、为用户加积分
Integer userId = userAddBonusMsgDTO.getUserId();
Integer bonus = userAddBonusMsgDTO.getBonus();
User user = userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
userMapper.updateByPrimaryKeySelective(user);
//2、记录日志到bonus_event_log表里面
BonusEventLog bonusEventLog = BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE")
.createTime(new Date()).description("投稿加积分").build();
bonusEventLogMapper.insertSelective(bonusEventLog);
log.info("积分添加完毕。。。。");
}
}
RocketMQ基于Spring编程模型的事务消息收发
编写事物消息Producer代码
@Service
@Slf4j
public class ShareService {
@Autowired
private ShareMapper shareMapper;
@Autowired
private UserCenterFeignClient userCenterFeignClient;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
public ShareDTO auditById(Integer id, ShareAuditDTO auditDTO){
//1、查询Share是否存在,不存在或者当前的audit_status != NOT_YET就抛异常
Share share = shareMapper.selectByPrimaryKey(id);
if(share == null){
throw new IllegalArgumentException("参数非法!该分享不存在");
}
if(!Objects.equals("NOT_YET",share.getAuditStatus())){
throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过");
}
//3、如果是PASS,那么就发送消息到rocketmq,让用户中心去消费,并为发布人添加积分
//该接口主要为审核,所以加积分使用异步操作,这样可以有效缩短该接口的响应耗时,从而提升用户体验
if(AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())){
String transactionId = UUID.randomUUID().toString();
//发送半消息
rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus-group",
"add-bonus",
MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build())
//header也有大用处
.setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId)
.setHeader("share_id",id)
.build(),
auditDTO);//最后一个参数为args
}else if(AuditStatusEnum.REJECT.equals(auditDTO.getAuditStatusEnum())){
this.auditBYIdInDB(id,auditDTO);
}
ShareDTO shareDTO = new ShareDTO();
BeanUtils.copyProperties(share,shareDTO);
return shareDTO;
}
@Transactional(rollbackFor = Exception.class)
public void auditBYIdInDB(Integer id,ShareAuditDTO auditDTO) {
//2、审核资源,将状态设为PASS/REJECT
Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString())
.reason(auditDTO.getReason()).build();
shareMapper.updateByPrimaryKeySelective(share);
}
@Transactional(rollbackFor = Exception.class)
public void auditBYIdWithRocketMqLog(Integer id,ShareAuditDTO auditDTO,String transactionId) {
this.auditBYIdInDB(id,auditDTO);
RocketmqTransactionLog rocketmqTransactionLog = RocketmqTransactionLog.builder().transactionId(transactionId)
.log("审核分享...").build();
rocketmqTransactionLogMapper.insertSelective(rocketmqTransactionLog);
}
}
创建事物消息Producer事务监听器
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private ShareService shareService;
@Autowired
private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
* 发送prepare消息成功此方法被回调,该方法用于执行本地事务
*
* @param message 回传的消息,利用transactionId即可获取到该消息的唯一Id
* @param auditDTO 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
* @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object auditDTO) {
MessageHeaders headers = message.getHeaders();
String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.parseInt((String)headers.get("share_id"));
try {
shareService.auditBYIdWithRocketMqLog(shareId,(ShareAuditDTO)auditDTO,transactionId);
//本地事物成功,执行commit
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事物执行异常,e={}",e);
//本地事物失败,执行rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 实现本地事务回查的逻辑,并返回本地事务执行状态
* @param message 通过获取transactionId来判断这条消息的本地事务执行状态
* @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);
RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog
.builder().transactionId(transactionId).build());
if(rocketmqTransactionLog == null){
log.error("如果本地事物日志没有记录,transactionId={}",transactionId);
//本地事物失败,执行rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
//如果本地事物日志有记录,执行commit
return RocketMQLocalTransactionState.COMMIT;
}
}
事务消息consumer代码和非事务消息编码一致
@Component
@RocketMQMessageListener(topic = "add-bonus",consumerGroup = "consumer-group")
@Slf4j
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
@Autowired
private UserService userService;
@Override
public void onMessage(UserAddBonusMsgDTO message) {
//收到消息后执行的业务。
userService.addBonus(message);
}
}
@Service
@Slf4j
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private BonusEventLogMapper bonusEventLogMapper;
@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO userAddBonusMsgDTO) {
//当收到消息的时候执行的业务
//1、为用户加积分
Integer userId = userAddBonusMsgDTO.getUserId();
Integer bonus = userAddBonusMsgDTO.getBonus();
User user = userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
userMapper.updateByPrimaryKeySelective(user);
//2、记录日志到bonus_event_log表里面
BonusEventLog bonusEventLog = BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE")
.createTime(new Date()).description("投稿加积分").build();
bonusEventLogMapper.insertSelective(bonusEventLog);
log.info("积分添加完毕。。。。");
}
}