前言
如果能不用消息队列可以实现业务逻辑,尽量别用。因为增加了学习成本。
我所在的公司,需要处理大量设备的信息。由于并发的问题以及信息顺序的要求,近期用上了消息队列。
下面提供下我整合的过程。
你可以学到
- 使用消息队列发送消息。
- 掌握一种交换器的配置。
- 消息的确认机制。避免消息丢失和消息的执行顺序问题。
环境说明
java版本
java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
开发工具
IntelliJ IDEA 2018.2.1 (Ultimate Edition)
Build #IU-182.3911.36, built on August 6, 2018
Licensed to lan yu
Subscription is active until May 5, 2019
JRE: 1.8.0_152-release-1248-b8 x86_64
JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o
macOS 10.13.6
docker
docker --version
Docker version 17.12.0-ce, build c97c6d6
代码中用到了idea插件lombok
如下图。如果不会百度搜下吧。
安装rabbitmq
这里采用docker安装。
docker run -d -p 15672:15672 -p 5672:5672 --name some-rabbit rabbitmq:management
上述命令会创建一个容器,该容器包含了rabbitmq
端口映射了2个。其中15672 是web控制台的端口, 5672是通信端口。
执行之后,访问
http://127.0.0.1:15672/
即可看到web控制台。账号和密码默认是guest
具体web控制台的管理,非本文重点。不介绍。
整合spring和rabbitmq
- 先创建一个spring的项目。
这里跳过。如果不会,请去下载本文的代码,导入。 - 引入依赖文件
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<!-- spring核心包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
注意版本。如果版本高低,可能会导致错误。
- 配置rabbitmq
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
<!--配置链接参数-->
<rabbit:connection-factory
id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username=""
password=""
publisher-confirms="true"
/>
<rabbit:admin connection-factory="connectionFactory" />
<!-- 给模板指定转换器 --><!-- mandatory必须设置true,return callback才生效 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
confirm-callback="confirmCallBackListener"
return-callback="returnCallBackListener"
mandatory="true"
/>
<!--配置队列-->
<rabbit:queue name="SERVER_DOWN1" />
<rabbit:queue name="CLIENT_UP1" />
<!--配置交换器,并绑定队列。direct 是 交换器的一种类型。-->
<rabbit:direct-exchange name="DIRECT_EX1" id="DIRECT_EX1" >
<rabbit:bindings>
<rabbit:binding queue="CLIENT_UP1" key="CLIENT_UP1" />
<rabbit:binding queue="SERVER_DOWN1" key="SERVER_DOWN1" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 配置消费者 -->
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="manual" >
<rabbit:listener queues="CLIENT_UP1" ref="lightaiConsumer" />
</rabbit:listener-container>
</beans>
这里介绍下消息队列的类型。
Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
Topic交换机,符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。
Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列
以上配置只使用了Direct交换机。表示的含义:
CLIENT_UP1 的消息 会进入 CLIENT_UP1 队列
SERVER_DOWN1 的消息会进入 SERVER_DOWN1 队列。
消费者只配置一个,因此只有CLIENT_UP1的消息会被消费。
- 消费者
package cn.wuwenfu.rabbitmqdemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Service;
@Service("lightaiConsumer")
@Slf4j
public class LightaiConsumer implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try{
String m = new String(message.getBody());
if (log.isInfoEnabled()){
log.info("Received '" + m+ "'");
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e){
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.error("出现异常");
e.printStackTrace();
}
}
}
这里一定捕捉异常,进行消息确认。否则会导致业务异常。
- 生产者
package cn.wuwenfu.rabbitmqdemo.main;
import cn.wuwenfu.rabbitmqdemo.publish.PublishService;
import cn.wuwenfu.rabbitmqdemo.publish.RabbitTemplatePublishService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author hiwower@gmail.com
* @date 2018/11/6 上午9:03
*/
@Component
public class Test {
private final String EXCHANGE_NAME="DIRECT_EX1";
@Autowired
private PublishService publishService;
@Autowired
private RabbitTemplatePublishService rabbitTemplatePublishService;
public void test(){
String message = System.currentTimeMillis()+"";
publishService.send(EXCHANGE_NAME,"SERVER_DOWN1",message);
publishService.send(EXCHANGE_NAME,"CLIENT_UP1",message);
publishService.send(EXCHANGE_NAME,"CLIENT_UP2",message);
rabbitTemplatePublishService.send(EXCHANGE_NAME,"SERVER_DOWN1",message);
rabbitTemplatePublishService.send(EXCHANGE_NAME,"CLIENT_UP1",message);
rabbitTemplatePublishService.send(EXCHANGE_NAME,"CLIENT_UP2",message);
}
}
上面代码发送6条相同消息,分别发送到3个路由中。
其中SERVER_DOWN1 队列收到2条,没绑定消费者。不会被消费。
其中CLIENT_UP1 队列收到2条,正常消费。
CLIENT_UP2 没有对应的队列。会 触发 发送消息的回调。
RabbitTemplate是AmqpTemplate的一个实现。可以任意选择其中一个进行发送。
- 发送消息确认
发送消息的时候,消息会先到达交换器,再通过路由Key 发送到队列。
其中涉及2个回调方法。只有这2个回调正常,才确保消息正确发送。
package cn.wuwenfu.rabbitmqdemo.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;
@Service("confirmCallBackListener")
@Slf4j
/**
* 到达交换器会触发.ack=true 反之false
*/
public class ConfirmCallBackListener implements ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (log.isInfoEnabled()){
log.info("交换机confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
}
}
}
消息只要发送了,这里肯定会执行。ack的值如果是true 表示成功。ack=false表示失败。
这里可以自行加入异常监控和处理。正常情况下不会出现。
package cn.wuwenfu.rabbitmqdemo.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.stereotype.Service;
@Service("returnCallBackListener")
@Slf4j
/**
* 路由异常会触发这里
*/
public class ReturnCallBackListener implements ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("路由return--message: ,msgBody:" + new String(message.getBody())
+ ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:"
+ routingKey);
}
}
如果消息正常投递到消息队列,这里不会执行。否则会触发。通常原因是找不到路由。
如下的replyText:NO_ROUTE,exchange:DIRECT_EX1,routingKey:CLIENT_UP2
/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=52802:/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/tools.jar:/Users/ft521/Documents/study/项目/javaproject/rabbitmqdemo/target/classes:/Users/ft521/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar:/Users/ft521/.m2/repository/org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.jar:/Users/ft521/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/ft521/.m2/repository/org/springframework/spring-core/4.2.6.RELEASE/spring-core-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/ft521/.m2/repository/org/springframework/spring-context-support/4.2.6.RELEASE/spring-context-support-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-beans/4.2.6.RELEASE/spring-beans-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-context/4.2.6.RELEASE/spring-context-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-aop/4.2.6.RELEASE/spring-aop-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/Users/ft521/.m2/repository/org/springframework/spring-expression/4.2.6.RELEASE/spring-expression-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/amqp/spring-rabbit/1.7.5.RELEASE/spring-rabbit-1.7.5.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-tx/4.3.13.RELEASE/spring-tx-4.3.13.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/retry/spring-retry/1.2.1.RELEASE/spring-retry-1.2.1.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-messaging/4.3.13.RELEASE/spring-messaging-4.3.13.RELEASE.jar:/Users/ft521/.m2/repository/com/rabbitmq/http-client/1.1.1.RELEASE/http-client-1.1.1.RELEASE.jar:/Users/ft521/.m2/repository/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar:/Users/ft521/.m2/repository/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar:/Users/ft521/.m2/repository/commons-codec/commons-codec/1.6/commons-codec-1.6.jar:/Users/ft521/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.8.4/jackson-databind-2.8.4.jar:/Users/ft521/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.8.0/jackson-annotations-2.8.0.jar:/Users/ft521/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.8.4/jackson-core-2.8.4.jar:/Users/ft521/.m2/repository/org/springframework/amqp/spring-amqp/1.7.5.RELEASE/spring-amqp-1.7.5.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-web/4.3.13.RELEASE/spring-web-4.3.13.RELEASE.jar:/Users/ft521/.m2/repository/com/rabbitmq/amqp-client/4.0.3/amqp-client-4.0.3.jar cn.wuwenfu.rabbitmqdemo.main.Send
objc[33289]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java (0x10316e4c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x1031fa4e0). One of the two will be used. Which one is undefined.
[ ERROR ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ReturnCallBackListener - ReturnCallBackListener.java(17) - 路由return--message: ,msgBody:1542861998264,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX1,routingKey:CLIENT_UP2
[ ERROR ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ReturnCallBackListener - ReturnCallBackListener.java(17) - 路由return--message: ,msgBody:1542861998264,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX1,routingKey:CLIENT_UP2
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交换机confirm--:correlationData:null,ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交换机confirm--:correlationData:CorrelationData [id=efd7b1da-bda8-4784-b4a0-e5856fb7641b],ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.consumer.LightaiConsumer - LightaiConsumer.java(18) - Received '1542861998264'
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交换机confirm--:correlationData:null,ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交换机confirm--:correlationData:null,ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交换机confirm--:correlationData:CorrelationData [id=fe1a8a1b-fccb-47d0-a887-dcac9fdd5595],ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交换机confirm--:correlationData:CorrelationData [id=8c5535a4-91dc-435c-9308-28098c29ee30],ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.consumer.LightaiConsumer - LightaiConsumer.java(18) - Received '1542861998264'
- 消费消息确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
表示对消息进行确认。一定要处理异常的消息确认。否则会造成意外。
由于对消息进行了手动确认,这就确保了消息的执行的时间上保证,确保业务逻辑执行完毕之后才进入下一条消息。
之前我这里出现问题,采用的自动确认,导致消息顺序异常。
最后
代码下载地址: csdn下载