消息重复被消费
今天日常开始统计系统信息(rabbitmq),被业务提醒数据出现异常,查找日志发现某条订单的消息被消费了两次。
以前我以为消息队列中的消息不会被重复消费,今天被打脸了,根据日志,消息在20秒内被消费了两次。在我系统中,我手动配置了消息确认。因该是返回确认的消息延迟了,还是怎么回事,导致出现这种结果。
上网查找解决方案,有两种方案:
- 消费端处理消息的业务逻辑保持幂等性
- 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。
第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。
第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。
因为业务逻辑是直接更新数据库,所以采用第二中方案。
实际项目不断摸索逐渐摸索出一套自认为安全的流程,先记录下
/**
* 系统通用消息队列
*
* @param connectionFactory 链接工厂
* @return mq监听容器
*/
@Bean
public SimpleMessageListenerContainer messageContainer5(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(commonQueue);
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(10);
container.setConcurrentConsumers(5);
container.setPrefetchCount(10);
// 设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
List<String> messageIds = new ArrayList<>();
StatisticMessage statisticMessage = null;
Integer dealNum = null;
try {
byte[] body = message.getBody();
String str = new String(body);
str = str.substring(1, str.length() - 1);
str = StringEscapeUtils.unescapeJava(str);
//消息队列的集合
messageIds.addAll((List<String>) redisManager.getRedisObj(messageRecode));
statisticMessage = JSONObject.parseObject(str, StatisticMessage.class);
//获取处理当前消息的次数
String dealNumString = redisManager.getObjByDB(5, statisticMessage.getMessageId());
if (CheckUtil.isEmpty(dealNumString)) {
redisManager.setObjDBAndTime(5, statisticMessage.getMessageId(), Integer.toString(0), 60 * 60 * 12);
dealNumString = String.valueOf(0);
}
dealNum = Integer.parseInt(dealNumString);
//超出重复处理次数,舍弃当前消息体
if (dealNum > MESSAGE_AGAIN_NUM) {
//确认消息成功消费,删除消息队列中的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//删除redis中信息
redisManager.delObjDB(5, statisticMessage.getMessageId());
}
//正式处理消息相关逻辑
if (messageIds.contains(statisticMessage.getMessageId())) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
String str2 = statisticMessage.getJsonMessage();
str2 = StringEscapeUtils.unescapeJava(str2);
//刷新用户缓存
if (statisticMessage.getSratisticType() == StatisticType.FreshScondToken.getCode()) {
List<String> userIds = JSONObject.parseObject(str2, List.class);
//do somethings
secondTokenService.refreshRepairShopTokenInfo(userIds);
messageIds.add(statisticMessage.getMessageId());
redisManager.createRedisObjNoValid(messageRecode, messageIds);
//手工发送确认。
if (channel != null) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
} catch (Exception e) {
messageIds.remove(statisticMessage == null ? null : statisticMessage.getMessageId());
redisManager.createRedisObjNoValid(messageRecode, messageIds);
//添加一次处理次数
dealNum++;
redisManager.setObjDBAndTime(5, statisticMessage.getMessageId(), Integer.toString(dealNum), 60 * 60 * 24 * 12);
// ack返回false,并重新回到队列,api里面解释得很清楚
if (channel != null) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
logger.error(ExceptionUtil.stacktraceToString(e));
}
}
});
return container;
}
上述代码逻辑规整
graph LR
接受到消息,并进行相关处理,转换成业务对象-->根据消息中的唯一的messageid判断是否处理完成当前消息
根据消息中的唯一的messageid判断是否处理完成当前消息--> 处理过
处理过-->舍弃
根据消息中的唯一的messageid判断是否处理完成当前消息--> 未处理过
未处理过--> 判断当前消息被处理的次数
判断当前消息被处理的次数 --> 处理次数大于5
处理次数大于5--> 序列化该消息,短期放弃处理,交由人工处理
判断当前消息被处理的次数 --> 处理次数小于5
处理次数小于5 --> 继续处理
继续处理 -->处理中出现异常则处理次数加一,让消息返回队列
继续处理 -->处理中出现正常则在messageid的集合中添加当前messageid