这里的分布式指的是基于服务的柔性事务,柔性事务必须满足4大项:
1、可查询操作:服务操作具有全局唯一的标识,操作唯一的确定的时间。
2、幂等操作:重复调用多次产生的业务结果与调用一次产生的结果相同。一是通过业务操作实现幂等性,二是系统缓存所有请求与处理的结果,最后是检测到重复请求之后,自动返回之前的处理结果。
3、TCC操作:Try阶段,尝试执行业务,完成所有业务的检查,实现一致性;预留必须的业务资源,实现准隔离性。Confirm阶段:真正的去执行业务,不做任何检查,仅适用Try阶段预留的业务资源,Confirm操作还要满足幂等性。Cancel阶段:取消执行业务,释放Try阶段预留的业务资源,Cancel操作要满足幂等性。TCC与2PC(两阶段提交)协议的区别:TCC位于业务服务层而不是资源层,TCC没有单独准备阶段,Try操作兼备资源操作与准备的能力,TCC中Try操作可以灵活的选择业务资源,锁定粒度。TCC的开发成本比2PC高。实际上TCC也属于两阶段操作,但是TCC不等同于2PC操作。
4、可补偿操作:Do阶段:真正的执行业务处理,业务处理结果外部可见。Compensate阶段:抵消或者部分撤销正向业务操作的业务结果,补偿操作满足幂等性。约束:补偿操作在业务上可行,由于业务执行结果未隔离或者补偿不完整带来的风险与成本可控。实际上,TCC的Confirm和Cancel操作可以看做是补偿操作。
分布式事务解决方案有很多,下面是我在空闲时间,基于RabbitMQ的分布式事务解决方案,我把它称之为消息驱动。
1、消息驱动可以干什么
答:可用于异步事务,分布式服务调用等
2、消息驱动需要注意的是什么
答:兼容本地事务,事务检查,消息发送失败自动补偿,消费者业务方免幂等操作,免事务操作
3、本消息驱动缺点是什么
答:违反设计模式中最少知道原则,并且有表侵入问题,消息表必须跟业务表在一起,正在努力改进中
在基于MQ解决分布式事务或者异步操作时,最大的问题无非就三点
1、 routingKey 管理困难、队列爆炸。解决方案:每个服务只用一个routingKey,消费方使用统一的处理器,我们把处理器当观察者,发送方发送消息时,只要带上处理器的名字,消费方对应处理器就执行此消息。
2、发送失败补偿。解决方案:发送方发送时消息驱动自动存储消息,发送失败后自动重复发送
3、消费失败重新消费,重复消费幂等问题。解决方案:消费方消费前自动存储待消费的消息,消费成功修改当前消息状态,消费失败自动调用消费方业务重新消费,只到成功为止。
设计概要:
整体思想基于经典MQ事务处理的二次封装,其中发送、接收消息表会侵入业务,这里使用消息中间件是 RabbitMQ。你也可以替换成RocketMQ。每个服务固定一个队列专门用来接收消息驱动发过来的消息,如果你的服务只发送不接收消息,那么只需要建立发送表就可以,如果即发送又接收那么两个表都要建,只接收消息可以只建立接收表。消费端,每种业务处理都是一个处理器,必须实现处理器ReceiveHandle接口,业务处理器自动会成为伪观察者监听消息。
模型图如下:
1、主业务方处理完,写数据库
2、主业务方调用消息驱动
3、消息驱动先检查事务,无事务则抛异常,然后向DB写发送数据
4、异步发送,发送成功失败不影响业务操作
5、消息确认
6、修改发送记录确认成功或失败(定时任务重试发送失败消息)
7、接收消息
8、写接收消息
9、开启事务调用业务方
10、业务方写DB
11、消息驱动修改消息处理成功(定时任务处理初始状态消息)
12、消息回执ACK
相关实现
结构
发送消息表和接收消息表 sql
CREATE TABLE `cs_receive_message` (
`message_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息类型',
`business_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '业务类型',
`business_id` bigint(19) NULL DEFAULT NULL COMMENT '业务订单号',
`data` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据',
`gmt_receive` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '接收时间',
`handle_state` tinyint(4) NULL DEFAULT NULL COMMENT '状态',
`gmt_create` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',
`gmt_modify` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改时间',
PRIMARY KEY (`message_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
CREATE TABLE `cs_send_message` (
`message_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息类型',
`business_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '业务类型',
`business_id` bigint(19) NULL DEFAULT NULL COMMENT '业务订单号',
`exchange` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '路由',
`routing_key` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '路由策略',
`data` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据',
`gmt_send` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '发送时间',
`gmt_confirm` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '确认时间',
`message_state` tinyint(4) NULL DEFAULT NULL COMMENT '状态',
`gmt_create` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',
`gmt_modify` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改时间',
PRIMARY KEY (`message_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
消息接收实体类
@Data
public class ReceiveMessage implements Serializable {
/** 消息类型 */
private String messageId;
/** 业务类型 */
private String businessType;
/** 业务订单号 */
private Long businessId;
/** 数据 */
private String data;
/** 接收时间 */
private Date gmtReceive;
/** 状态 */
private Integer handleState;
/** 创建时间 */
private Date gmtCreate;
/** 修改时间 */
private Date gmtModify;
private static final long serialVersionUID = 1L;
}
消息发送实体类
@Data
public class SendMessage implements Serializable {
/** 消息类型 */
private String messageId;
/** 业务类型 */
private String businessType;
/** 业务订单号 */
private Long businessId;
/** 路由 */
private String exchange;
/** 路由策略 */
private String routingKey;
/** 数据 */
private String data;
/** 发送时间 */
private Date gmtSend;
/** 确认时间 */
private Date gmtConfirm;
/** 状态 */
private Integer messageState;
/** 创建时间 */
private Date gmtCreate;
/** 修改时间 */
private Date gmtModify;
private static final long serialVersionUID = 1L;
}
消息处理状态枚举类
public enum ReceiveHandleStateEnum {
/** 初始 */
INIT(0, "初始"),
/** 处理中 */
PROCESSING(5, "处理中"),
/** 处理成功 */
PROCESS_SUCCESS(10, "处理成功");
/** 编码 */
private Integer code;
/** 说明 */
private String describe;
ReceiveHandleStateEnum(Integer code, String describe) {
this.code = code;
this.describe = describe;
}
public Integer getCode() {
return code;
}
public String getDescribe() {
return describe;
}
}
消息发送状态枚举类
public enum SendHandleStateEnum {
/** 初始 */
INIT(0, "初始"),
/** 待确认 */
WAITING_CONFIRM(5, "待确认"),
/** 发送成功 */
SEND_SUCCESS(10, "发送成功"),
/** 发送失败 */
SEND_FAIL(15, "发送失败");
/** 编码 */
private Integer code;
/** 说明 */
private String describe;
SendHandleStateEnum(Integer code, String describe) {
this.code = code;
this.describe = describe;
}
public Integer getCode() {
return code;
}
public String getDescribe() {
return describe;
}
}
接收处理Mapper配置文件,忽略公共部分,公共部分请用mybatis工具生成
<!-- 根据业务订单和查询接收消息列表 -->
<select id="findByBusinessId" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from cs_receive_message
where business_id = #{businessId,jdbcType=BIGINT}
</select>
<!-- 查询未处理的消息 -->
<select id="findOverdueOrder" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from cs_receive_message
WHERE
<![CDATA[
gmt_create <= #{endGmtCreate,jdbcType=VARCHAR}
]]>
AND handle_state in
<foreach item="item" collection="receiveStateList" separator="," open="(" close=")" index="">
#{item,jdbcType=INTEGER}
</foreach>
</select>
发送处理Mapper配置文件
<!-- 根据业务订单号查询列表-->
<select id="findByBusinessId" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from cs_send_message
where business_id = #{businessId,jdbcType=BIGINT}
</select>
<select id="findOverdueOrder" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from cs_send_message
WHERE
<![CDATA[
gmt_create <= #{endGmtCreate,jdbcType=VARCHAR}
]]>
AND message_state in
<foreach item="item" collection="sendStateList" separator="," open="(" close=")" index="">
#{item,jdbcType=INTEGER}
</foreach>
</select>
接收处理Dao 类
public interface ReceiveMessageMapper extends BaseMapper<ReceiveMessage, String> {
/**
* 根据业务订单和查询接收消息列表
* @param businessId 业务ID
* @return
*/
List<ReceiveMessage> findByBusinessId(Long businessId);
/**
* 根据状态查询创建时间小于等于endGmtCreate
* @param receiveStateList 接收状态
* @param endGmtCreate 完成时间
* @return
*/
List<ReceiveMessage> findOverdueOrder(@Param(value = "receiveStateList") List<Integer> receiveStateList, @Param(value = "endGmtCreate") String endGmtCreate);
}
发送处理Dao类
public interface SendMessageMapper extends BaseMapper<SendMessage, String> {
/**
* 根据业务订单号查询列表
* @param businessId 业务ID
* @return
*/
List<SendMessage> findByBusinessId(Long businessId);
/**
* 根据状态查询创建时间小于等于endGmtCreate
* @param sendStateList 发送状态
* @param endGmtCreate 完成时间
* @return
*/
List<SendMessage> findOverdueOrder(@Param(value = "sendStateList") List<Integer> sendStateList, @Param(value = "endGmtCreate") String endGmtCreate);
}
发送Service类
@Slf4j
@Service
public class SendMessageService {
@Resource
private SendMessageMapper sendMessageMapper;
public void insert(SendMessage sendMessage) {
if (sendMessage == null || StringUtils.isEmpty(sendMessage.getMessageId())) {
throw new RuntimeException("参数错误");
}
int total = sendMessageMapper.insertSelective(sendMessage);
if (total <= 0) {
throw new RuntimeException("系统异常");
}
}
public void update(SendMessage sendMessage) {
if (sendMessage == null || StringUtils.isEmpty(sendMessage.getMessageId())) {
throw new RuntimeException("参数错误");
}
sendMessageMapper.updateByPrimaryKeySelective(sendMessage);
}
public SendMessage getByMessageId(String messageId) {
if (StringUtils.isEmpty(messageId)){
return null;
}
return sendMessageMapper.selectByPrimaryKey(messageId);
}
public List<SendMessage> findByBusinessId(Long businessId) {
if (businessId == null){
return Collections.emptyList();
}
return sendMessageMapper.findByBusinessId(businessId);
}
public void updateMessageState(String messageId, Integer messageState) {
if (StringUtils.isEmpty(messageId) || messageState == null) {
throw new RuntimeException("参数错误");
}
SendMessage sendMessageUpdate = new SendMessage();
sendMessageUpdate.setMessageId(messageId);
sendMessageUpdate.setMessageState(messageState);
sendMessageUpdate.setGmtConfirm(new Date());
update(sendMessageUpdate);
}
public List<SendMessage> findOverdueOrder(List<Integer> sendStateList, String endGmtCreate) {
if (sendStateList.isEmpty() || StringUtils.isEmpty(endGmtCreate)){
return Collections.emptyList();
}
return sendMessageMapper.findOverdueOrder(sendStateList, endGmtCreate);
}
}
接收Service类
@Slf4j
@Service
public class ReceiveMessageService {
@Resource
private ReceiveMessageMapper receiveMessageMapper;
public void insert(ReceiveMessage receiveMessage) {
if (receiveMessage == null || StringUtils.isEmpty(receiveMessage.getMessageId())) {
throw new RuntimeException("参数错误");
}
int total = receiveMessageMapper.insertSelective(receiveMessage);
if (total <= 0) {
throw new RuntimeException("系统异常");
}
}
public void update(ReceiveMessage receiveMessage) {
if (receiveMessage == null || StringUtils.isEmpty(receiveMessage.getMessageId())) {
throw new RuntimeException("参数错误");
}
receiveMessageMapper.updateByPrimaryKeySelective(receiveMessage);
}
public ReceiveMessage getByMessageId(String messageId) {
if (StringUtils.isEmpty(messageId)) {
return null;
}
return receiveMessageMapper.selectByPrimaryKey(messageId);
}
public List<ReceiveMessage> findByBusinessId(Long businessId) {
if (businessId == null) {
return Collections.emptyList();
}
return receiveMessageMapper.findByBusinessId(businessId);
}
public void updateHandleState(String messageId, Integer handleState) {
if (StringUtils.isEmpty(messageId) || handleState == null) {
throw new RuntimeException("参数错误");
}
ReceiveMessage receiveMessageUpdate = new ReceiveMessage();
receiveMessageUpdate.setMessageId(messageId);
receiveMessageUpdate.setHandleState(handleState);
update(receiveMessageUpdate);
}
public List<ReceiveMessage> findOverdueOrder(List<Integer> receiveStateList, String endGmtCreate) {
if (receiveStateList.isEmpty() || StringUtils.isEmpty(endGmtCreate)){
return Collections.emptyList();
}
return receiveMessageMapper.findOverdueOrder(receiveStateList, endGmtCreate);
}
}
发送工具类
@Slf4j
@Component
public class SendExecute {
@Autowired
private SendMessageService sendMessageService;
@Autowired
@Qualifier("driveRabbitTemplate")
private RabbitTemplate driveRabbitTemplate;
@Transactional(rollbackFor = Exception.class)
public void execute(Long businessId, String businessType, String routingKey, String date) {
String messageId = StringUtil.randomGUID();
SendMessage message = new SendMessage();
message.setMessageId(messageId);
message.setBusinessId(businessId);
message.setBusinessType(businessType);
message.setData(date);
message.setExchange(driveRabbitTemplate.getExchange());
message.setRoutingKey(routingKey);
message.setGmtSend(new Date());
message.setMessageState(SendHandleStateEnum.WAITING_CONFIRM.getCode());
sendMessageService.insert(message);
Home("开始异步发送消息{}", message);
sendMessage(message);
}
/**
* 发送消息(这里用public只是为了事务)
* @param message
*/
@Async("sendPool")
public void sendMessage(SendMessage message) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(message.getMessageId());
driveRabbitTemplate.convertAndSend(message.getRoutingKey(),
(Object) JSONObject.toJSONString(message), correlationData);
Home("异步消息发送成功{}", message);
}
}
异步线程池配置
@Configuration
public class DriveAsyncConfig {
@Bean(name = "sendPool")
public ThreadPoolExecutor withdrawAuditPool() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(128), new ThreadPoolExecutor.DiscardPolicy());
return threadPoolExecutor;
}
}
接收业务处理器接口
public interface ReceiveHandle {
/**
* 获取处理器名称
* @return
*/
String getName();
/**
* 业务处理
* @param data 业务数据
*/
void execute(String data);
}
接收执行器
@Slf4j
@Service
public class ReceiveExecute {
@Autowired
private RedisLock redisLock;
@Autowired
private ReceiveMessageService receiveMessageService;
@Autowired
private ApplicationContext applicationContext;
/** 接收处理类 */
private volatile Map<String, ReceiveHandle> handleMap;
/** 处理锁名称 */
private static final String RECEIVE_EXECUTE_LOCK = "RECEIVE:EXECUTE:LOCK:";
/**
* 通过类型获取接收处理类
* @param businessType
* @return
*/
private ReceiveHandle getReceiveHandle(String businessType) {
if (CollectionUtils.isEmpty(handleMap)) {
synchronized (this) {
if (CollectionUtils.isEmpty(handleMap)) {
// 获取所有实现类型
Map<String, ReceiveHandle> beans = applicationContext.getBeansOfType(ReceiveHandle.class);
if (CollectionUtils.isEmpty(beans)) {
return null;
}
// 初始化map
handleMap = beans.values().stream().collect(Collectors.toMap(ReceiveHandle::getName, x -> x));
}
}
}
return handleMap.get(businessType);
}
/**
* 业务处理操作
* @param messageId
* @param businessType
*/
@Transactional(rollbackFor = Exception.class)
public void execute(String messageId, String businessType) {
Home("消息驱动开始业务处理 参数 messageId{} businessType{}", messageId, businessType);
// 验证参数
if (StringUtils.isEmpty(messageId) || StringUtils.isEmpty(businessType)) {
return;
}
// 开始调用处理
ReceiveHandle receiveHandle = getReceiveHandle(businessType);
if (receiveHandle == null) {
return;
}
redisLock.tryLock(RECEIVE_EXECUTE_LOCK, messageId, () -> {
// 必须是待处理状态
ReceiveMessage receiveMessageSelect = receiveMessageService.getByMessageId(messageId);
if (receiveMessageSelect == null) {
throw new RuntimeException("消息" + messageId + "不存在");
}
Home("消息驱动业务处理 messageId{}当前状态{}", messageId, receiveMessageSelect.getHandleState());
// 定时任务会查初始状态数据并调用此方执行
if (!ReceiveHandleStateEnum.INIT.getCode().equals(receiveMessageSelect.getHandleState())) {
return;
}
// 修改消息处理中
receiveMessageService.updateHandleState(receiveMessageSelect.getMessageId(), ReceiveHandleStateEnum.PROCESSING.getCode());
Home("驱动开始业务处理 参数{}", receiveMessageSelect);
// 处理业务
receiveHandle.execute(receiveMessageSelect.getData());
Home("驱动结束业务处理 参数{}", receiveMessageSelect);
// 修改消息处理成功
receiveMessageService.updateHandleState(receiveMessageSelect.getMessageId(), ReceiveHandleStateEnum.PROCESS_SUCCESS.getCode());
});
Home("消息驱动结束业务处理");
}
}
MQ监听器
@Slf4j
@Service
public class ReceiveListener implements ChannelAwareMessageListener {
@Autowired
private RedisLock redisLock;
@Autowired
private ReceiveExecute receiveExecute;
@Autowired
private ReceiveMessageService receiveMessageService;
/** 写库锁名称 */
private static final String RECEIVE_WRITING_DATA_LOCK = "RECEIVE:WRITING:DATA:LOCK:";
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Home("消息驱动接收消息 - 数据{}", message.getBody());
if (message == null || StringUtils.isEmpty(message.getBody())) {
return;
}
// 获取发送数据
String json = new String(message.getBody(), StandardCharsets.UTF_8);
Home("消息驱动接收消息 - 数据{}", json);
SendMessage sendMessage = JSONObject.parseObject(json, SendMessage.class);
if (sendMessage == null) {
// 拒绝并且丢弃
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
return;
}
redisLock.tryLock(RECEIVE_WRITING_DATA_LOCK, sendMessage.getMessageId(), () -> {
// 是否接收过,已经接收过直接确认
ReceiveMessage receiveMessageSelect = receiveMessageService.getByMessageId(sendMessage.getMessageId());
if (receiveMessageSelect != null) {
Home("消息驱动接收消息 - 重复消息 ID{}", sendMessage.getMessageId());
return;
}
// 消息入库
ReceiveMessage receiveMessage = new ReceiveMessage();
receiveMessage.setMessageId(sendMessage.getMessageId());
receiveMessage.setBusinessType(sendMessage.getBusinessType());
receiveMessage.setBusinessId(sendMessage.getBusinessId());
receiveMessage.setData(sendMessage.getData());
receiveMessage.setGmtReceive(new Date());
receiveMessage.setHandleState(ReceiveHandleStateEnum.INIT.getCode());
receiveMessageService.insert(receiveMessage);
Home("消息驱动接收消息 - 消息写入DB成功{}", receiveMessage);
});
// 调用业务执行
receiveExecute.execute(sendMessage.getMessageId(), sendMessage.getBusinessType());
// 消息确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
驱动管理器
public interface DriveManage {
/**
* 发送消息
* @param businessId 业务ID
* @param businessType 业务类型
* @param date 业务数据
*/
void send(Long businessId, String businessType, String date);
/**
* 发送消息
* @param businessId 业务ID
* @param businessType 业务类型
* @param routingKey 路由key
* @param date 业务数据
*/
void send(Long businessId, String businessType, String routingKey, String date);
}
驱动实现类
@Slf4j
@Component
@EnableScheduling
public class DriveManageImpl implements DriveManage {
@Autowired
private SendMessageService sendMessageService;
@Autowired
private ReceiveMessageService receiveMessageService;
@Lazy
@Autowired
private SendExecute sendExecute;
@Lazy
@Autowired
private ReceiveExecute receiveExecute;
/** 驱动 - 消息队列名称(必须为每个业务系统建立一个队列) */
@Value("${drive.message.queue.name}")
private String messageQueueName;
/** 默认exchange */
@Value("${spring.rabbitmq.template.exchange}")
private String exchange;
/** 驱动 - 本服务routingKey */
@Value("${drive.message.routingKey}")
private String routingKey;
@Bean
public RabbitTemplate driveRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate driveRabbitTemplate = new RabbitTemplate(connectionFactory);
driveRabbitTemplate.setExchange(exchange);
driveRabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// 修改消息发送成功
sendMessageService.updateMessageState(correlationData.getId(), SendHandleStateEnum.SEND_SUCCESS.getCode());
Home("drive消息确认 - 发送成功 ID{}", correlationData.getId());
} else {
sendMessageService.updateMessageState(correlationData.getId(), SendHandleStateEnum.SEND_FAIL.getCode());
log.error("drive消消息确认 - 发送失败 ID{}", correlationData.getId());
}
});
return driveRabbitTemplate;
}
@Override
public void send(Long businessId, String businessType, String date) {
sendExecute.execute(businessId, businessType, routingKey, date);
}
@Override
public void send(Long businessId, String businessType, String routingKey, String date) {
sendExecute.execute(businessId, businessType, routingKey, date);
}
@Bean
public MessageListenerContainer openAccountListenerContainer(ConnectionFactory connectionFactory, ReceiveListener receiveListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(messageQueueName);
container.setMessageListener(receiveListener);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
/**
* 同步发送记录
*/
@Scheduled(cron = "0 0/1 * * * ?")
public void synSend() {
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, -1);
String endGmtCreate = new SimpleDateFormat(DateConstant.DATETIME_FORMAT).format(calendar.getTime());
// 查询一分钟之前发送状态为待确认的
List<Integer> sendStateList = new ArrayList<>();
sendStateList.add(SendHandleStateEnum.SEND_FAIL.getCode());
sendStateList.add(SendHandleStateEnum.WAITING_CONFIRM.getCode());
List<SendMessage> sendMessagesListSelect = sendMessageService.findOverdueOrder(sendStateList, endGmtCreate);
if (sendMessagesListSelect.isEmpty()) {
return;
}
// 调用发送执行方法
for (SendMessage sendMessage : sendMessagesListSelect) {
sendExecute.sendMessage(sendMessage);
}
}
/**
* 同步接收记录
*/
@Scheduled(cron = "0 0/1 * * * ?")
public void synReceive() {
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, -1);
String endGmtCreate = new SimpleDateFormat(DateConstant.DATETIME_FORMAT).format(calendar.getTime());
// 查询一分钟之前发送状态为初始
List<Integer> receiveStateList = new ArrayList<>();
receiveStateList.add(ReceiveHandleStateEnum.INIT.getCode());
List<ReceiveMessage> receiveMessageList = receiveMessageService.findOverdueOrder(receiveStateList, endGmtCreate);
if (receiveMessageList.isEmpty()) {
return;
}
// 调用接收执行方法
for (ReceiveMessage receiveMessage : receiveMessageList) {
receiveExecute.execute(receiveMessage.getMessageId(), receiveMessage.getBusinessType());
}
}
}
业务实现例子
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Starter.class)
public class MainTest {
@Autowired
private DriveManage driveManage;
public String testSend() {
return driveManage.send(0L, "order.pay.result", "支付成功");
return driveManage.send(1L, "order.create.result", "下单成功");
}
}
@Slf4j
@Service
public class Receive1 implements ReceiveHandle {
@Override
public String getName() {
return "order.pay.result";
}
@Override
public void execute(String data) {
Home("驱动业务实现{},接收参数{}", getName(), data);
Home("订单支付成功");
}
}
@Slf4j
@Service
public class Receive2 implements ReceiveHandle {
@Override
public String getName() {
return "order.create.result";
}
@Override
public void execute(String data) {
Home("驱动业务实现{},接收参数{}", getName(), data);
Home("订单下单成");
}
}
引用方配置
drive:
message:
queue:
name: topic.transaction.drive.order
routingKey: order.transaction.orde