作为才转后台的菜鸟,做一些插件的集成确实踩了许多坑啊。废话不多说,直接进入正题。
pom文件
maven引入
<pre>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</pre>
application配置
这是yml文件的配置,划线属性是为了消息能收到回调(对publisher来说)
config文件
<pre>
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private Queue3Listener queue3Listener;
@Bean
@Primary
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
@Primary
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
simpleRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
return simpleRabbitListenerContainerFactory;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue3());
container.setMessageListener(queue3Listener);
return container;
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(EX_CHANGE_NAME1);
}
@Bean
public Queue queue1() {
return new Queue(QUEUE1, true);
}
@Bean
public Queue queue2() {
return new Queue(QUEUE2, true);
}
@Bean
public Queue queue3() {
return new Queue(QUEUE3, true);
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUTING_KEY1);
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(directExchange()).with(ROUTING_KEY2);
}
@Bean
public Binding binding3() {
return BindingBuilder.bind(queue3()).to(directExchange()).with(ROUTING_KEY3);
}
</pre>这里用了两种方式去监听消息(consumer角度)。
首先我门来讲一下 MQ的标准
几个基本概念线附上
- ConnectionFactory、Connection、Channel
connection为socket连接的封装,connectionFqactory是connection的生产工程,channel是通信的信道,实际进行数据交流的管道,因为建立connection的开销明显要比建立channel要大很多,所以数据传输真实发生在channel内 - Exchange,Queue
exchange是可以理解成一条特殊的传输通道,他会把消息投递到绑定的消息池内。
queue就是消息池了,使用前需要绑定exchange,以及自己的标志。 - exchange_key,routing_key
exchange_key决定了publisher的消息投递到哪条通道,routing_key决定了将消息放到哪个池子里 - 绑定
queue要接受消息必须与exchange进行绑定,并在绑定的时候给自己与exchange的绑定设置一个标记routing_key,以后用来匹配消息接收
exchange与queue是一对多的关系,根据exchange不同类型,分别投递到不同的消息池
下面来看看exchange的类型
- fanout
直接将消息发送到与该exchange绑定的所有queue内 - direct
对routing_key进行严格匹配,当消息来到的时候,只有exchange与某queue绑定的routing_key完全匹配才将消息投递到该queue - topic
用特殊符号进行匹配,满足条件的queue都能收到消息,这里的routing_key以"."分隔,*匹配一个单词,#匹配多个单词,如果满足多个条件也不会投递多次 - headers
不依赖routing_key匹配,根据消息体内的headers属性匹配,绑定的时候可以制定键值对
接下来来看看配置文件
1.@Bean统一注入到容器中,我们声明了connectionfactory,他会自动根据application里面的属性进行组装,这个连接对于后面的容器都是要用到的,这里要注意converter的设置,因为我们要将pojo类型进行传输,一般程序外的传输都是建立在字节流的基础上,converter就会自动转换
2.接下来我们声明queue,true属性设置为持久型的池子,当连接断开时,消息会呗保留,然后声明exchange,这里我们使用的是directexchange,接下来将两者绑定起来 - 声明SimpleMessageListenerContainer,SimpleRabbitListenerContainerFactory注意这里声明两个是因为这是消息监听的两种方式
首先讲讲SimpleMessageListenerContainer,这个需要设置确认方式,有较多属性克设置,有兴趣可自行设置,这里我只是简单的设置了一下,然后要设置listener,
listener需要实现ChannelAwareMessageListener里面有
public void onMessage(Message message, Channel channel) 的重载方法需要实现,消息体在Message的body内,相对来说信息比较完备
接下来看看SimpleRabbitListenerContainerFactory,这个有几个注意点,需要再次设置converter因为,一个是发消息的时候解析成二进制,这个则是将二进制解析成具体的类,回调相对简单一点
<pre>
@Component
@RabbitListener(queues = RabbitMQConfig.QUEUE1, containerFactory = "simpleRabbitListenerContainerFactory")
public class Queue1Listener {
private static Logger logger = LoggerFactory.getLogger(Queue1Listener.class);
@RabbitHandler
public void receive(@Payload String s) {
logger.info("listener1 info: " + s);
}
}
</pre>
记得需要containerFactory具体写出来
在接收消息的方法上写@RabbitHandler,消息体打上@payload久可以接受消息了。
其实还有个 方法就是指定一个MessageAdapter,然后在container里面就可以指定接收的方法名,不是很推荐,明文反射总感觉容易出问题
当然publisher也是有消息的回调的
RabbitTemplate下有ConfirmCallback实现confirm方法就好了