一、事务
RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback()。txSelect()开启事务,生产者发送消息内容给mq,这是一阶段提交。然后本地可以继续处理自己的业务逻辑,处理完提交事务,就发送提交事务的消息给mq,mq就可以直接后续处理了。如果本地处理有问题,回滚本地业务,发送一个回滚事务的消息给mq,mq就知道这条消息作废了,进行回滚,不进行后续的操作了。
但事务机制是一个同步的过程,效率相对较低,如果对数据一致性要求很高的话可以使用事务机制。
二、ack消息确认
rabbitmq的confirm模式是异步的,所以相对效率会高很多。
1.rabiitmq消息确认分为两种:
1.发送消息的确认。分为消息发送到交换机的确认、消息发送到队列的确认
2.接收消息的确认。
2.springboot集成rabiitmq的确认模式:
acknowledgeMode有三值:
A、NONE = no acks will be sent (incompatible with channelTransacted=true).
RabbitMQ calls this "autoack" because the broker assumes all messages are acked without any action from the consumer.
B、MANUAL = the listener must acknowledge all messages by calling Channel.basicAck().
C、AUTO = the container will acknowledge the message automatically, unless the MessageListener throws an exception.
简单来说也就是:
none:不确认,不会发送任何ack
manual:手动确认,发送端和客户端都需要手动确认
auto:自动确认,就是自动发ack,除非抛异常。
3.代码
配置:
@Configuration
public class MqConsumerConfig {
public final static String QUEUE_ACK_NAME = "orderme-queue.yannic.ack";
public static final String ORDER_WEBSOCKET_EXCHANGE = "orderme.yannic.websocket";
@Bean(name="orderTopicAckQueue")
public Queue orderTopicAckQueue() {
return new Queue(QUEUE_ACK_NAME);
}
@Bean(name = "orderWebSocketExchange")
public TopicExchange orderWebSocketExchange() {
return new TopicExchange(ORDER_WEBSOCKET_EXCHANGE);
}
@Bean
Binding bindingExchangeAckMessage(@Qualifier("orderTopicAckQueue") Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("yannic.*");
}
/**
* 定制化amqp模版
* connectionFactory:包含了yml文件配置参数
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 必须设置为 true,不然当 发送到交换器成功,但是没有匹配的队列,不会触发 ReturnCallback 回调
// 而且 ReturnCallback 比 ConfirmCallback 先回调,意思就是 ReturnCallback 执行完了才会执行 ConfirmCallback
rabbitTemplate.setMandatory(true);
// 设置 ConfirmCallback 回调 yml需要配置 publisher-confirms: true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
// 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false
// 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑,需要注意)
if (ack) {
String messageId = correlationData.getId();
System.out.println("confirm:"+messageId);
}
});
// 设置 ReturnCallback 回调 yml需要配置 publisher-returns: true
// 如果发送到交换器成功,但是没有匹配的队列,就会触发这个回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText,
exchange, routingKey) -> {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("return:"+messageId);
});
return rabbitTemplate;
}
}
发送端:
/**
* 发送信息确认ack
* @param exchange
* @param routingKey
* @param object
*/
public void sendMessageAck(String exchange, String routingKey, Object object) {
logger.info("mq消息发送开始===》");
try {
//CorrelationData用于confirm机制里的回调确认
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchange,routingKey,JSON.toJSONString(object),correlationData);
logger.info("mq消息发送结束==》{}", object);
} catch (Exception e) {
logger.error(String.format("mq 发送 %s 的数据 %s 异常", exchange, object), e);
} finally {
}
}
消费端:
/**
* 手动确认ack
* @param msg
*/
@RabbitListener(queues = MqConsumerConfig.QUEUE_ACK_NAME)
public void consumeTopicAckMessage(Message msg, Channel channel) {
logger.info("接收的消息为:{}",msg.getBody());
try {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
logger.error("接收mq消息失败:{}",msg);
}
}
4.深入思考
生产者发送消息给mq,遇到网络抖动或者mq这时候宕机了,没有收到mq的ack怎么办?
方案一:就是事务控制咯。这个就是效率慢,rabiitmq的事务与confirm不能同时使用.
方案二:生产者这边业务控制。比如生产者每次发消息之前先把消息保存到本地,如果收到ack就把这个消息给删除,没有收到就隔一段时间重试,最多重试个3次,还是没收到就把这个消息登记起来后续处理,不再发送了。