RabbitMQ的主要作用基本上可以用8个字概括,削峰填谷异步解耦。但是引入MQ我们也不得不考虑引入MQ后带来的一些问题,如消息丢失。
在一些业务场景不一样,处理方式也就不一样,比如发短信,日志收集我们主要看吞吐量所以对消息丢失容忍度较高,这类场景基本上不用花太多时间在消息丢失问题上。另外一种,如我们用MQ来做分布式事务,续保计算,提成的计算,这类业务对消息丢失容忍度较底,所以我们一定要考虑消息丢失的问题。这次分享的内容是怎么来最大限制的防止消息丢失,顺带提一下消息的重发和重复消费。
RabbitMQ 模型图
ConfirmCallback和ReturnCallback
在这个里我们主要实现了ConfirmCallback和ReturnCallback两个接口。这两个接口主要是用来发送消息后回调的。因为rabbit发送消息是只管发,至于发没发成功,发送方法不管。
- ConfirmCallback:当消息成功到达exchange的时候触发的ack回调。
- ReturnCallback:当消息成功到达exchange,但是没有队列与之绑定的时候触发的ack回调。发生网络分区会出现这种情况。
在这里一定要把这两个开关打开, publisher-confirms="true" publisher-returns="true"。
生产者端使用ConfirmCallback和ReturnCallback回调机制,最大限度的保证消息不丢失,对原有CorrelationData类进行扩展,来实现消息的重发,具体请看源码。
消息的日志链路跟踪
使用MQ来解耦服务,异步化处理一些复杂耗时逻辑,但是也带来了一个问题。由于异步化以后,排查问题就很不方便了,根本不知道这个消息什么时候消费,消费的日志也很不好排查。所以引入了Slf4j MDC机制将主线程的日志链路和消息的日志链路连起来,方便MQ问题的排查。
RabbitSender
import com.alibaba.fastjson.JSON;
import com.wlqq.insurance.common.enums.MetricNameEnum;
import com.wlqq.insurance.common.enums.SystemTypeEnum;
import com.wlqq.insurance.common.log.core.FisLoggerFactory;
import com.wlqq.insurance.common.mq.CorrelationData;
import com.wlqq.insurance.common.service.AlertService;
import org.slf4j.Logger;
import org.slf4j.MDC;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.UUID;
/**
* Rabbit 发送消息
*
* @author yuhao.wang
*/
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {
private final Logger logger = FisLoggerFactory.getLogger(RabbitSender.class);
@Value("${mq.retry.count}")
private int mqRetryCount;
/**
* 告警服务
*/
@Autowired
private AlertService alertService;
/**
* Rabbit MQ 客户端
*/
private RabbitTemplate rabbitTemplate;
/**
* 发送MQ消息,异步
*
* @param exchangeName 交换机名称
* @param routingKey 路由名称
* @param message 发送消息体
*/
public void sendMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
Assert.notNull(message, "message 消息体不能为NULL");
Assert.notNull(exchangeName, "exchangeName 不能为NULL");
Assert.notNull(routingKey, "routingKey 不能为NULL");
// 获取CorrelationData对象
CorrelationData correlationData = this.correlationData(message, message.getMessageId());
correlationData.setExchange(exchangeName);
correlationData.setRoutingKey(routingKey);
correlationData.setMessage(message);
logger.info("发送MQ消息,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
// 发送消息
this.convertAndSend(exchangeName, routingKey, message, correlationData);
}
/**
* RPC方式,发送MQ消息
*
* @param exchangeName 交换机名称
* @param routingKey 路由名称
* @param message 发送消息体
*/
public void sendAndReceiveMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
Assert.notNull(message, "message 消息体不能为NULL");
Assert.notNull(exchangeName, "exchangeName 不能为NULL");
Assert.notNull(routingKey, "routingKey 不能为NULL");
// 获取CorrelationData对象
CorrelationData correlationData = this.correlationData(message, message.getMessageId());
correlationData.setExchange(exchangeName);
correlationData.setRoutingKey(routingKey);
correlationData.setMessage(message);
logger.info("发送MQ消息,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message);
}
/**
* 用于实现消息发送到RabbitMQ交换器后接收ack回调。
* 如果消息发送确认失败就进行重试。
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
CorrelationData correlationDataExtends = null;
if (correlationData instanceof CorrelationData) {
correlationDataExtends = (CorrelationData) correlationData;
if (correlationDataExtends.getMdcContainer() != null) {
// 日志链路跟踪
MDC.setContextMap(correlationDataExtends.getMdcContainer());
}
}
// 消息回调确认失败处理
if (!ack) {
if (correlationDataExtends != null) {
//消息发送失败,就进行重试,重试过后还不能成功就记录到数据库
if (correlationDataExtends.getRetryCount() < mqRetryCount) {
logger.info("MQ消息发送失败,消息重发,消息ID:{},重发次数:{},消息体:{}", correlationDataExtends.getId(),
correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));
// 将重试次数加一
correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);
// 重发发消息
this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
correlationDataExtends.getMessage(), correlationDataExtends);
} else {
//消息重试发送失败,将消息放到数据库等待补发
logger.error("MQ消息重发失败,消息ID:{},消息体:{}", correlationData.getId(),
JSON.toJSONString(correlationDataExtends.getMessage()));
alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(),
correlationDataExtends.getExchange(), null);
}
}
} else {
logger.info("消息发送成功,消息ID:{}", correlationData.getId());
}
}
/**
* 用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
* 在脑裂的情况下会出现这种情况。
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 反序列化消息
Object msg = rabbitTemplate.getMessageConverter().fromMessage(message);
if (msg instanceof com.wlqq.insurance.common.mq.message.Message) {
// 日志链路跟踪
MDC.setContextMap(((com.wlqq.insurance.common.mq.message.Message) msg).getMdcContainer());
}
logger.error("MQ消息发送失败,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息体:{}",
replyCode, replyText, exchange, routingKey, JSON.toJSONString(msg));
alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
}
/**
* 消息相关数据(消息ID)
*
* @param message 消息体
* @param messageId 消息ID
* @return
*/
private CorrelationData correlationData(Object message, String messageId) {
// 消息ID默认使用UUID
if (StringUtils.isEmpty(messageId)) {
messageId = UUID.randomUUID().toString();
}
return new CorrelationData(messageId, message);
}
/**
* 发送消息
*
* @param exchange 交换机名称
* @param routingKey 路由key
* @param message 消息内容
* @param correlationData 消息相关数据(消息ID)
* @throws AmqpException
*/
private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) {
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
} catch (Exception e) {
logger.error("MQ消息发送异常,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);
alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
}
}
@Override
public void afterPropertiesSet() throws Exception {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
}
CorrelationData
import lombok.Data;
import org.slf4j.MDC;
import java.util.Map;
/**
* 发送消息的相关数据
*
* @author yuhao.wang
*/
@Data
public class CorrelationData extends org.springframework.amqp.rabbit.support.CorrelationData {
/**
* MDC容器
* 获取父线程MDC中的内容,做日志链路
*/
private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();
/**
* 消息体
*/
private volatile Object message;
/**
* 交换机名称
*/
private String exchange;
/**
* 路由key
*/
private String routingKey;
/**
* 重试次数
*/
private int retryCount = 0;
public CorrelationData(String id) {
super(id);
}
public CorrelationData(String id, Object data) {
this(id);
this.message = data;
}
}
Message
/**
* MQ消息的父类消息体
*
* @author yuhao.wang
*/
@Data
public class Message implements Serializable {
private static final long serialVersionUID = -4731326195678504565L;
/**
* MDC容器
* 获取父线程MDC中的内容,做日志链路
*/
private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();
/**
* 消息ID(消息的唯一标示)
*/
private String messageId;
}
AbstractConsumer
/**
* 默认消费者
*
* @author yuhao.wang3
*/
public abstract class AbstractConsumer implements MessageListener {
private static final Logger LOGGER = FisLoggerFactory.getLogger(AbstractConsumer.class);
@Override
public void onMessage(Message msg) {
String body = null;
try {
// 日志链路跟踪逻辑
body = new String(msg.getBody(), "utf-8");
DefaultMessage message = JSON.parseObject(body, DefaultMessage.class);
Map<String, String> container = message.getMdcContainer();
if (container != null) {
// 日志链路跟踪
MDC.setContextMap(message.getMdcContainer());
}
} catch (Exception e) {
LOGGER.warn("没有找到MQ消息日志链路数据,无法做日志链路追踪");
}
try {
// 处理消息逻辑
doMessage(msg);
LOGGER.info("成功处理MQ消息, 消息体:{}", body);
} catch (Exception e) {
LOGGER.error("处理MQ消息异常 {}, 消息体:{}", JSON.toJSONString(msg), body, e);
}
}
/**
* 处理消息的实现方法
*
* @param msg
*/
public abstract void doMessage(Message msg);
}
源码
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-rabbitmq 工程