基于RabbitMQ解决分布式事务那点事


这里的分布式指的是基于服务的柔性事务,柔性事务必须满足4大项:
1、可查询操作:服务操作具有全局唯一的标识,操作唯一的确定的时间。
2、幂等操作:重复调用多次产生的业务结果与调用一次产生的结果相同。一是通过业务操作实现幂等性,二是系统缓存所有请求与处理的结果,最后是检测到重复请求之后,自动返回之前的处理结果。
3、TCC操作:Try阶段,尝试执行业务,完成所有业务的检查,实现一致性;预留必须的业务资源,实现准隔离性。Confirm阶段:真正的去执行业务,不做任何检查,仅适用Try阶段预留的业务资源,Confirm操作还要满足幂等性。Cancel阶段:取消执行业务,释放Try阶段预留的业务资源,Cancel操作要满足幂等性。TCC与2PC(两阶段提交)协议的区别:TCC位于业务服务层而不是资源层,TCC没有单独准备阶段,Try操作兼备资源操作与准备的能力,TCC中Try操作可以灵活的选择业务资源,锁定粒度。TCC的开发成本比2PC高。实际上TCC也属于两阶段操作,但是TCC不等同于2PC操作。
4、可补偿操作:Do阶段:真正的执行业务处理,业务处理结果外部可见。Compensate阶段:抵消或者部分撤销正向业务操作的业务结果,补偿操作满足幂等性。约束:补偿操作在业务上可行,由于业务执行结果未隔离或者补偿不完整带来的风险与成本可控。实际上,TCC的Confirm和Cancel操作可以看做是补偿操作。

分布式事务解决方案有很多,下面是我在空闲时间,基于RabbitMQ的分布式事务解决方案,我把它称之为消息驱动。

1、消息驱动可以干什么
答:可用于异步事务,分布式服务调用等
2、消息驱动需要注意的是什么
答:兼容本地事务,事务检查,消息发送失败自动补偿,消费者业务方免幂等操作,免事务操作
3、本消息驱动缺点是什么
答:违反设计模式中最少知道原则,并且有表侵入问题,消息表必须跟业务表在一起,正在努力改进中

在基于MQ解决分布式事务或者异步操作时,最大的问题无非就三点
1、 routingKey 管理困难、队列爆炸。解决方案:每个服务只用一个routingKey,消费方使用统一的处理器,我们把处理器当观察者,发送方发送消息时,只要带上处理器的名字,消费方对应处理器就执行此消息。
2、发送失败补偿。解决方案:发送方发送时消息驱动自动存储消息,发送失败后自动重复发送

3、消费失败重新消费,重复消费幂等问题。解决方案:消费方消费前自动存储待消费的消息,消费成功修改当前消息状态,消费失败自动调用消费方业务重新消费,只到成功为止。

设计概要:
整体思想基于经典MQ事务处理的二次封装,其中发送、接收消息表会侵入业务,这里使用消息中间件是 RabbitMQ。你也可以替换成RocketMQ。每个服务固定一个队列专门用来接收消息驱动发过来的消息,如果你的服务只发送不接收消息,那么只需要建立发送表就可以,如果即发送又接收那么两个表都要建,只接收消息可以只建立接收表。消费端,每种业务处理都是一个处理器,必须实现处理器ReceiveHandle接口,业务处理器自动会成为伪观察者监听消息。

模型图如下:



1、主业务方处理完,写数据库
2、主业务方调用消息驱动
3、消息驱动先检查事务,无事务则抛异常,然后向DB写发送数据
4、异步发送,发送成功失败不影响业务操作
5、消息确认
6、修改发送记录确认成功或失败(定时任务重试发送失败消息)
7、接收消息
8、写接收消息
9、开启事务调用业务方
10、业务方写DB
11、消息驱动修改消息处理成功(定时任务处理初始状态消息)
12、消息回执ACK

相关实现

结构



发送消息表和接收消息表 sql

CREATE TABLE `cs_receive_message`  (
  `message_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息类型',
  `business_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '业务类型',
  `business_id` bigint(19) NULL DEFAULT NULL COMMENT '业务订单号',
  `data` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据',
  `gmt_receive` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '接收时间',
  `handle_state` tinyint(4) NULL DEFAULT NULL COMMENT '状态',
  `gmt_create` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',
  `gmt_modify` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改时间',
  PRIMARY KEY (`message_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
 
CREATE TABLE `cs_send_message`  (
  `message_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息类型',
  `business_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '业务类型',
  `business_id` bigint(19) NULL DEFAULT NULL COMMENT '业务订单号',
  `exchange` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '路由',
  `routing_key` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '路由策略',
  `data` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据',
  `gmt_send` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '发送时间',
  `gmt_confirm` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '确认时间',
  `message_state` tinyint(4) NULL DEFAULT NULL COMMENT '状态',
  `gmt_create` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',
  `gmt_modify` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '修改时间',
  PRIMARY KEY (`message_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

消息接收实体类

@Data
public class ReceiveMessage implements Serializable {
    /** 消息类型 */
    private String messageId;
    /** 业务类型 */
    private String businessType;
    /** 业务订单号 */
    private Long businessId;
    /** 数据 */
    private String data;
    /** 接收时间 */
    private Date gmtReceive;
    /** 状态 */
    private Integer handleState;
    /** 创建时间 */
    private Date gmtCreate;
    /** 修改时间 */
    private Date gmtModify;
    private static final long serialVersionUID = 1L;
}

消息发送实体类

@Data
public class SendMessage implements Serializable {
    /** 消息类型 */
    private String messageId;
    /** 业务类型 */
    private String businessType;
    /** 业务订单号 */
    private Long businessId;
    /** 路由 */
    private String exchange;
    /** 路由策略 */
    private String routingKey;
    /** 数据 */
    private String data;
    /** 发送时间 */
    private Date gmtSend;
    /** 确认时间 */
    private Date gmtConfirm;
    /** 状态 */
    private Integer messageState;
    /** 创建时间 */
    private Date gmtCreate;
    /** 修改时间 */
    private Date gmtModify;
    private static final long serialVersionUID = 1L;
}

消息处理状态枚举类

public enum ReceiveHandleStateEnum {
    /** 初始 */
    INIT(0, "初始"),
    /** 处理中 */
    PROCESSING(5, "处理中"),
    /** 处理成功 */
    PROCESS_SUCCESS(10, "处理成功");
    /** 编码 */
    private Integer code;
    /** 说明 */
    private String describe;
    ReceiveHandleStateEnum(Integer code, String describe) {
        this.code = code;
        this.describe = describe;
    }
    public Integer getCode() {
        return code;
    }
    public String getDescribe() {
        return describe;
    }
}

消息发送状态枚举类

public enum SendHandleStateEnum {
    /** 初始 */
    INIT(0, "初始"),
    /** 待确认 */
    WAITING_CONFIRM(5, "待确认"),
    /** 发送成功 */
    SEND_SUCCESS(10, "发送成功"),
    /** 发送失败 */
    SEND_FAIL(15, "发送失败");
    /** 编码 */
    private Integer code;
    /** 说明 */
    private String describe;
    SendHandleStateEnum(Integer code, String describe) {
        this.code = code;
        this.describe = describe;
    }
    public Integer getCode() {
        return code;
    }
    public String getDescribe() {
        return describe;
    }
}

接收处理Mapper配置文件,忽略公共部分,公共部分请用mybatis工具生成

<!-- 根据业务订单和查询接收消息列表 -->
<select id="findByBusinessId" resultMap="BaseResultMap">
    select
    <include refid="Base_Column_List" />
    from cs_receive_message
    where business_id = #{businessId,jdbcType=BIGINT}
</select>
<!-- 查询未处理的消息 -->
<select id="findOverdueOrder" resultMap="BaseResultMap">
    select
    <include refid="Base_Column_List" />
    from cs_receive_message
    WHERE
    <![CDATA[
        gmt_create <= #{endGmtCreate,jdbcType=VARCHAR}
    ]]>
    AND handle_state in
    <foreach item="item" collection="receiveStateList" separator="," open="(" close=")" index="">
        #{item,jdbcType=INTEGER}
    </foreach>
</select>

发送处理Mapper配置文件

<!-- 根据业务订单号查询列表-->
<select id="findByBusinessId" resultMap="BaseResultMap">
    select
    <include refid="Base_Column_List" />
    from cs_send_message
    where business_id = #{businessId,jdbcType=BIGINT}
</select>
<select id="findOverdueOrder" resultMap="BaseResultMap">
    select
    <include refid="Base_Column_List" />
    from cs_send_message
    WHERE
    <![CDATA[
        gmt_create <= #{endGmtCreate,jdbcType=VARCHAR}
    ]]>
    AND message_state in
    <foreach item="item" collection="sendStateList" separator="," open="(" close=")" index="">
        #{item,jdbcType=INTEGER}
    </foreach>
</select>

接收处理Dao 类

public interface ReceiveMessageMapper extends BaseMapper<ReceiveMessage, String> {
    /**
     * 根据业务订单和查询接收消息列表
     * @param businessId 业务ID
     * @return
     */
    List<ReceiveMessage> findByBusinessId(Long businessId);
    /**
     * 根据状态查询创建时间小于等于endGmtCreate
     * @param receiveStateList 接收状态
     * @param endGmtCreate 完成时间
     * @return
     */
    List<ReceiveMessage> findOverdueOrder(@Param(value = "receiveStateList") List<Integer> receiveStateList, @Param(value = "endGmtCreate") String endGmtCreate);
}

发送处理Dao类

public interface SendMessageMapper extends BaseMapper<SendMessage, String> {
    /**
     * 根据业务订单号查询列表
     * @param businessId 业务ID
     * @return
     */
    List<SendMessage> findByBusinessId(Long businessId);
    /**
     * 根据状态查询创建时间小于等于endGmtCreate
     * @param sendStateList 发送状态
     * @param endGmtCreate 完成时间
     * @return
     */
    List<SendMessage> findOverdueOrder(@Param(value = "sendStateList") List<Integer> sendStateList, @Param(value = "endGmtCreate") String endGmtCreate);
}

发送Service类

@Slf4j
@Service
public class SendMessageService {
    @Resource
    private SendMessageMapper sendMessageMapper;
    public void insert(SendMessage sendMessage) {
        if (sendMessage == null || StringUtils.isEmpty(sendMessage.getMessageId())) {
            throw new RuntimeException("参数错误");
        }
        int total = sendMessageMapper.insertSelective(sendMessage);
        if (total <= 0) {
            throw new RuntimeException("系统异常");
        }
    }
    public void update(SendMessage sendMessage) {
        if (sendMessage == null || StringUtils.isEmpty(sendMessage.getMessageId())) {
            throw new RuntimeException("参数错误");
        }
        sendMessageMapper.updateByPrimaryKeySelective(sendMessage);
    }
    public SendMessage getByMessageId(String messageId) {
        if (StringUtils.isEmpty(messageId)){
            return null;
        }
        return sendMessageMapper.selectByPrimaryKey(messageId);
    }
    public List<SendMessage> findByBusinessId(Long businessId) {
        if (businessId == null){
            return Collections.emptyList();
        }
        return sendMessageMapper.findByBusinessId(businessId);
    }
    public void updateMessageState(String messageId, Integer messageState) {
        if (StringUtils.isEmpty(messageId) || messageState == null) {
            throw new RuntimeException("参数错误");
        }
        SendMessage sendMessageUpdate = new SendMessage();
        sendMessageUpdate.setMessageId(messageId);
        sendMessageUpdate.setMessageState(messageState);
        sendMessageUpdate.setGmtConfirm(new Date());
        update(sendMessageUpdate);
    }
    public List<SendMessage> findOverdueOrder(List<Integer> sendStateList, String endGmtCreate) {
        if (sendStateList.isEmpty() || StringUtils.isEmpty(endGmtCreate)){
            return Collections.emptyList();
        }
        return sendMessageMapper.findOverdueOrder(sendStateList, endGmtCreate);
    }
}

接收Service类

@Slf4j
@Service
public class ReceiveMessageService {
    @Resource
    private ReceiveMessageMapper receiveMessageMapper;
    public void insert(ReceiveMessage receiveMessage) {
        if (receiveMessage == null || StringUtils.isEmpty(receiveMessage.getMessageId())) {
            throw new RuntimeException("参数错误");
        }
        int total = receiveMessageMapper.insertSelective(receiveMessage);
        if (total <= 0) {
            throw new RuntimeException("系统异常");
        }
    }
    public void update(ReceiveMessage receiveMessage) {
        if (receiveMessage == null || StringUtils.isEmpty(receiveMessage.getMessageId())) {
            throw new RuntimeException("参数错误");
        }
        receiveMessageMapper.updateByPrimaryKeySelective(receiveMessage);
    }
    public ReceiveMessage getByMessageId(String messageId) {
        if (StringUtils.isEmpty(messageId)) {
            return null;
        }
        return receiveMessageMapper.selectByPrimaryKey(messageId);
    }
    public List<ReceiveMessage> findByBusinessId(Long businessId) {
        if (businessId == null) {
            return Collections.emptyList();
        }
        return receiveMessageMapper.findByBusinessId(businessId);
    }
    public void updateHandleState(String messageId, Integer handleState) {
        if (StringUtils.isEmpty(messageId) || handleState == null) {
            throw new RuntimeException("参数错误");
        }
        ReceiveMessage receiveMessageUpdate = new ReceiveMessage();
        receiveMessageUpdate.setMessageId(messageId);
        receiveMessageUpdate.setHandleState(handleState);
        update(receiveMessageUpdate);
    }
    public List<ReceiveMessage> findOverdueOrder(List<Integer> receiveStateList, String endGmtCreate) {
        if (receiveStateList.isEmpty() || StringUtils.isEmpty(endGmtCreate)){
            return Collections.emptyList();
        }
        return receiveMessageMapper.findOverdueOrder(receiveStateList, endGmtCreate);
    }
}

发送工具类

@Slf4j
@Component
public class SendExecute {
    @Autowired
    private SendMessageService sendMessageService;
    @Autowired
    @Qualifier("driveRabbitTemplate")
    private RabbitTemplate driveRabbitTemplate;
    @Transactional(rollbackFor = Exception.class)
    public void execute(Long businessId, String businessType, String routingKey, String date) {
        String messageId = StringUtil.randomGUID();
        SendMessage message = new SendMessage();
        message.setMessageId(messageId);
        message.setBusinessId(businessId);
        message.setBusinessType(businessType);
        message.setData(date);
        message.setExchange(driveRabbitTemplate.getExchange());
        message.setRoutingKey(routingKey);
        message.setGmtSend(new Date());
        message.setMessageState(SendHandleStateEnum.WAITING_CONFIRM.getCode());
        sendMessageService.insert(message);
        Home("开始异步发送消息{}", message);
        sendMessage(message);
    }
    /**
     * 发送消息(这里用public只是为了事务)
     * @param message
     */
    @Async("sendPool")
    public void sendMessage(SendMessage message) {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(message.getMessageId());
        driveRabbitTemplate.convertAndSend(message.getRoutingKey(), 
                (Object) JSONObject.toJSONString(message), correlationData);
        Home("异步消息发送成功{}", message);
    }
}

异步线程池配置

@Configuration
public class DriveAsyncConfig {
    @Bean(name = "sendPool")
    public ThreadPoolExecutor withdrawAuditPool() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(128), new ThreadPoolExecutor.DiscardPolicy());
        return threadPoolExecutor;
    }
}

接收业务处理器接口

public interface ReceiveHandle {
    /**
     * 获取处理器名称
     * @return
     */
    String getName();
    /**
     * 业务处理
     * @param data 业务数据
     */
    void execute(String data);
}

接收执行器

@Slf4j
@Service
public class ReceiveExecute {
    @Autowired
    private RedisLock redisLock;
    @Autowired
    private ReceiveMessageService receiveMessageService;
    @Autowired
    private ApplicationContext applicationContext;
    /** 接收处理类 */
    private volatile Map<String, ReceiveHandle> handleMap;
    /** 处理锁名称 */
    private static final String RECEIVE_EXECUTE_LOCK = "RECEIVE:EXECUTE:LOCK:";
    /**
     * 通过类型获取接收处理类
     * @param businessType
     * @return
     */
    private ReceiveHandle getReceiveHandle(String businessType) {
        if (CollectionUtils.isEmpty(handleMap)) {
            synchronized (this) {
                if (CollectionUtils.isEmpty(handleMap)) {
                    // 获取所有实现类型
                    Map<String, ReceiveHandle> beans = applicationContext.getBeansOfType(ReceiveHandle.class);
                    if (CollectionUtils.isEmpty(beans)) {
                        return null;
                    }
                    // 初始化map
                    handleMap = beans.values().stream().collect(Collectors.toMap(ReceiveHandle::getName, x -> x));
                }
            }
        }
        return handleMap.get(businessType);
    }
    /**
     * 业务处理操作
     * @param messageId
     * @param businessType
     */
    @Transactional(rollbackFor = Exception.class)
    public void execute(String messageId, String businessType) {
        Home("消息驱动开始业务处理 参数 messageId{} businessType{}", messageId, businessType);
        // 验证参数
        if (StringUtils.isEmpty(messageId) || StringUtils.isEmpty(businessType)) {
            return;
        }
        // 开始调用处理
        ReceiveHandle receiveHandle = getReceiveHandle(businessType);
        if (receiveHandle == null) {
            return;
        }
        redisLock.tryLock(RECEIVE_EXECUTE_LOCK, messageId, () -> {
            // 必须是待处理状态
            ReceiveMessage receiveMessageSelect = receiveMessageService.getByMessageId(messageId);
            if (receiveMessageSelect == null) {
                throw new RuntimeException("消息" + messageId + "不存在");
            }
            Home("消息驱动业务处理 messageId{}当前状态{}", messageId, receiveMessageSelect.getHandleState());
            // 定时任务会查初始状态数据并调用此方执行
            if (!ReceiveHandleStateEnum.INIT.getCode().equals(receiveMessageSelect.getHandleState())) {
                return;
            }
            // 修改消息处理中
            receiveMessageService.updateHandleState(receiveMessageSelect.getMessageId(), ReceiveHandleStateEnum.PROCESSING.getCode());
            Home("驱动开始业务处理 参数{}", receiveMessageSelect);
            // 处理业务
            receiveHandle.execute(receiveMessageSelect.getData());
            Home("驱动结束业务处理 参数{}", receiveMessageSelect);
            // 修改消息处理成功
            receiveMessageService.updateHandleState(receiveMessageSelect.getMessageId(), ReceiveHandleStateEnum.PROCESS_SUCCESS.getCode());
        });
        Home("消息驱动结束业务处理");
    }
}

MQ监听器

@Slf4j
@Service
public class ReceiveListener implements ChannelAwareMessageListener {
    @Autowired
    private RedisLock redisLock;
    @Autowired
    private ReceiveExecute receiveExecute;
    @Autowired
    private ReceiveMessageService receiveMessageService;
    /**  写库锁名称 */
    private static final String RECEIVE_WRITING_DATA_LOCK = "RECEIVE:WRITING:DATA:LOCK:";
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        Home("消息驱动接收消息 - 数据{}", message.getBody());
        if (message == null || StringUtils.isEmpty(message.getBody())) {
            return;
        }
        // 获取发送数据
        String json = new String(message.getBody(), StandardCharsets.UTF_8);
        Home("消息驱动接收消息 - 数据{}", json);
        SendMessage sendMessage = JSONObject.parseObject(json, SendMessage.class);
        if (sendMessage == null) {
            // 拒绝并且丢弃
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            return;
        }
        redisLock.tryLock(RECEIVE_WRITING_DATA_LOCK, sendMessage.getMessageId(), () -> {
            // 是否接收过,已经接收过直接确认
            ReceiveMessage receiveMessageSelect = receiveMessageService.getByMessageId(sendMessage.getMessageId());
            if (receiveMessageSelect != null) {
                Home("消息驱动接收消息 - 重复消息 ID{}", sendMessage.getMessageId());
                return;
            }
            // 消息入库
            ReceiveMessage receiveMessage = new ReceiveMessage();
            receiveMessage.setMessageId(sendMessage.getMessageId());
            receiveMessage.setBusinessType(sendMessage.getBusinessType());
            receiveMessage.setBusinessId(sendMessage.getBusinessId());
            receiveMessage.setData(sendMessage.getData());
            receiveMessage.setGmtReceive(new Date());
            receiveMessage.setHandleState(ReceiveHandleStateEnum.INIT.getCode());
            receiveMessageService.insert(receiveMessage);
            Home("消息驱动接收消息 - 消息写入DB成功{}", receiveMessage);
        });
        // 调用业务执行
        receiveExecute.execute(sendMessage.getMessageId(), sendMessage.getBusinessType());
        // 消息确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

驱动管理器

public interface DriveManage {
    /**
     * 发送消息
     * @param businessId 业务ID
     * @param businessType 业务类型
     * @param date 业务数据
     */
    void send(Long businessId, String businessType, String date);
    /**
     * 发送消息
     * @param businessId 业务ID
     * @param businessType 业务类型
     * @param routingKey 路由key
     * @param date 业务数据
     */
    void send(Long businessId, String businessType, String routingKey, String date);
}

驱动实现类

@Slf4j
@Component
@EnableScheduling
public class DriveManageImpl implements DriveManage {
    @Autowired
    private SendMessageService sendMessageService;
    @Autowired
    private ReceiveMessageService receiveMessageService;
    @Lazy
    @Autowired
    private SendExecute sendExecute;
    @Lazy
    @Autowired
    private ReceiveExecute receiveExecute;
    /** 驱动 - 消息队列名称(必须为每个业务系统建立一个队列) */
    @Value("${drive.message.queue.name}")
    private String messageQueueName;
    /** 默认exchange */
    @Value("${spring.rabbitmq.template.exchange}")
    private String exchange;
    /** 驱动 - 本服务routingKey */
    @Value("${drive.message.routingKey}")
    private String routingKey;
    @Bean
    public RabbitTemplate driveRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate driveRabbitTemplate = new RabbitTemplate(connectionFactory);
        driveRabbitTemplate.setExchange(exchange);
        driveRabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                // 修改消息发送成功
                sendMessageService.updateMessageState(correlationData.getId(), SendHandleStateEnum.SEND_SUCCESS.getCode());
                Home("drive消息确认 - 发送成功 ID{}", correlationData.getId());
            } else {
                sendMessageService.updateMessageState(correlationData.getId(), SendHandleStateEnum.SEND_FAIL.getCode());
                log.error("drive消消息确认 - 发送失败 ID{}", correlationData.getId());
            }
        });
        return driveRabbitTemplate;
    }
    @Override
    public void send(Long businessId, String businessType, String date) {
        sendExecute.execute(businessId, businessType, routingKey, date);
    }
    @Override
    public void send(Long businessId, String businessType, String routingKey, String date) {
        sendExecute.execute(businessId, businessType, routingKey, date);
    }
    @Bean
    public MessageListenerContainer openAccountListenerContainer(ConnectionFactory connectionFactory, ReceiveListener receiveListener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(messageQueueName);
        container.setMessageListener(receiveListener);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }
    /**
     * 同步发送记录
     */
    @Scheduled(cron = "0 0/1 * * * ?")
    public void synSend() {
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE, -1);
        String endGmtCreate = new SimpleDateFormat(DateConstant.DATETIME_FORMAT).format(calendar.getTime());
 
        // 查询一分钟之前发送状态为待确认的
        List<Integer> sendStateList = new ArrayList<>();
        sendStateList.add(SendHandleStateEnum.SEND_FAIL.getCode());
        sendStateList.add(SendHandleStateEnum.WAITING_CONFIRM.getCode());
        List<SendMessage> sendMessagesListSelect = sendMessageService.findOverdueOrder(sendStateList, endGmtCreate);
        if (sendMessagesListSelect.isEmpty()) {
            return;
        }
        // 调用发送执行方法
        for (SendMessage sendMessage : sendMessagesListSelect) {
            sendExecute.sendMessage(sendMessage);
        }
    }
    /**
     * 同步接收记录
     */
    @Scheduled(cron = "0 0/1 * * * ?")
    public void synReceive() {
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE, -1);
        String endGmtCreate = new SimpleDateFormat(DateConstant.DATETIME_FORMAT).format(calendar.getTime());
        // 查询一分钟之前发送状态为初始
        List<Integer> receiveStateList = new ArrayList<>();
        receiveStateList.add(ReceiveHandleStateEnum.INIT.getCode());
        List<ReceiveMessage> receiveMessageList = receiveMessageService.findOverdueOrder(receiveStateList, endGmtCreate);
        if (receiveMessageList.isEmpty()) {
            return;
        }
        // 调用接收执行方法
        for (ReceiveMessage receiveMessage : receiveMessageList) {
            receiveExecute.execute(receiveMessage.getMessageId(), receiveMessage.getBusinessType());
        }
    }
}

业务实现例子

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Starter.class)
public class MainTest {
    @Autowired
    private DriveManage driveManage;
    public String testSend() {
        return driveManage.send(0L, "order.pay.result", "支付成功");
        return driveManage.send(1L, "order.create.result", "下单成功");
    }
}
@Slf4j
@Service
public class Receive1 implements ReceiveHandle {
    @Override
    public String getName() {
        return "order.pay.result";
    }
    @Override
    public void execute(String data) {
        Home("驱动业务实现{},接收参数{}", getName(), data);
        Home("订单支付成功");
    }
 
}
@Slf4j
@Service
public class Receive2 implements ReceiveHandle {
    @Override
    public String getName() {
        return "order.create.result";
    }
    @Override
    public void execute(String data) {
        Home("驱动业务实现{},接收参数{}", getName(), data);
        Home("订单下单成");
    }
}

引用方配置

drive:
  message:
    queue:
      name: topic.transaction.drive.order
    routingKey: order.transaction.orde
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335

推荐阅读更多精彩内容