rabbitmq是为数不多的spring官方开放集成插件的消息队列插件,rabbitmq-spring网上教程很多,但是之前我一直没找到想关于ack的代码实现,之前一个项目中有使用就贴出来分享一下,那时候比较懒没有去官网找到相关的api文档,有资料的朋友发出来分享一下。
pom
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<rabbit:connection-factory virtual-host="***" id="connectionFactory" host="rabbitmq.com" username="***"
password="***" port="***"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!--add mq for cooperation record-->
<!-- queue 队列声明-->
<rabbit:queue id="addque" durable="true" auto-delete="false" exclusive="false" name="addque"/>
<!-- exchange queue binging key 绑定 -->
<rabbit:direct-exchange name="queExchange" durable="true" auto-delete="false" id="queExchange">
<rabbit:bindings>
<rabbit:binding queue="addque" key="addque"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- spring template声明-->
<rabbit:template exchange="queExchange" id="amqpTemplate" connection-factory="connectionFactory" />
<bean id="queListener" class="com.rabbitmq.listener.QueListener"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener queues="addque" ref="queListener" method="onMessage"/>
</rabbit:listener-container>
</beans>
package com.rabbitmq.listener;
import java.io.IOException;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.bean.Que;
import com.bean.dto.QueDto;
import com.enums.RecordErrEnum;
import com.exception.QueException;
import com.service.QueService;
import com.util.queConversionUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
@Service("queListener")
public class QueListener implements ChannelAwareMessageListener {
private static final Logger logger = Logger.getLogger(queListener.class);
private static ObjectMapper MAPPER = new ObjectMapper();
@Autowired
private QueService queService;
@Override
public void onMessage(Message message, Channel channel) throws IOException {
logger.info("get mq message for que " + new String(message.getBody()));
try {
QueDto queDto = MAPPER.readValue(message.getBody(), QueDto.class);
//业务逻辑
}
catch (QueException ce) {
//特定exception重发
if (ce.getErrCode() == RecordErrEnum.HTTP_SEND_ERR.getErrCode()) {
logger.info("由于访问passport获取snsId接口失败,3秒后重新记录 " + new String(message.getBody()));
try {
Thread.sleep(3000);
//ack表示失败,要求重新发送
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
return;
}
catch (Exception e) {
logger.error("重发mq消息" + new String(message.getBody()) + " 失败", e);
}
}
}
catch (Exception e) {
logger.error("save que " + new String(message.getBody()) + " error", e);
}
try {
//最终应答rabbitmq返回,防止消息堵塞
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
catch (IOException e) {
logger.error("deal Record message is " + new String(message.getBody()) + "error ", e);
}
}
}
rabbitmq的ack机制很简单,继承接口ChannelAwareMessageListener ,重写onMessage ,Channel channel为一个控制ack的变量, channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true)来告知mq重新发送。
发送消息的方式也很简单
@Autowired
private AmqpTemplate amqpTemplate;
注入AmqpTemplate,然后根据路由发送特定对象。
amqpTemplate.convertAndSend("addque", MAPPER.writeValueAsString(xxxx));