组件
RabbitMQ是AMQP协议的实现。它主要包括以下组件:
- Server: RabbitMQ实例
- Vertual Host: 是一个虚拟概念,用于权限控制,一个Virtual Host里面可以有若干个Exchange和Queue,不同Vertual Host不可见。
- Queue:消息队列,用于存储还未被消费者消费的消息。
- Exchange:接受生产者发送的消息,根据自身模式转法给不同的Queue。常用的有3种direct、Fanout和Topic。
- Routing Key:发送方发送消息时,需要指定exchange与routing key,在direct、topic模式的exchange下结合binding key决定消息路由到哪个Queue中。
- Binding Key:消息接收方创建Queue与Exchange的绑定规则时指定,就是Routing Key的正则,代表条件的Routing Key会被转发到本队列。
例子
这里举一个Topic模式的例子。
- 发送方配置
spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=10.20.2.240
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=false
spring.rabbitmq.virtual-host=/
- 发送方配置类:
@Configuration
public class RabbitConfig {
@Bean
public TopicExchange topicExCertification() {
return new TopicExchange("certification_exchange");
}
}
- 发送类(不要在意Msg类,是我自定义的,测试直接扔个字符串过去就行)
@Service
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void topicSendMessage(final String data) {
final String context = " -- from RoutingKey : topic.message";
final Msg msg = Msg.builder().data(data + context).sendTime(new DateTime()).build();
System.out.println("Sender : " + msg.toString());
// exchangeName, routingKey, msg
rabbitTemplate.convertAndSend(Constant.TOPIC_EX, "topic.message", msg);
}
}
- 接收方配置
spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=10.20.2.240
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.acknowledge-mode=none
spring.rabbitmq.virtual-host=/
- 接收方配置类
@Configuration
public class RabbitConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(Constant.TOPIC_EX);
}
// topic 方式
@Bean
public Queue queueMessageAll() {
return new Queue(Constant.Q_TOPIC_MESSAGE_ALL);
}
/**
* binding_key=topic.# 模糊匹配routing_key为topic.xxx发过来的消息加放队列
*/
@Bean
public Binding bindingExchangeMessageAll(final Queue queueMessageAll, final TopicExchange topicExchange) {
return BindingBuilder.bind(queueMessageAll).to(topicExchange).with("topic.#");
}
}
- 接收方监听队列类
@Component
public class Receiver {
@RabbitListener(queues = Constant.Q_TOPIC_MESSAGE_ALL)
@RabbitHandler
public void topicReceiver2(final Msg msg) {
System.out.println("Topic Message All Receiver:" + msg.toString());
}
}
网上看到一些例子,发送方和接收方都写在一起,让人看起来很懵,
虽然 Exchange,Queue,Binding,都可以在一端建,不过本着设计原则,这里建议:
- 发送方只关注往哪个Exchange 用什么规则(Routing key)发,因为你如果对外服务,只需要发布这个事件,而不需要关注谁要消费
- 接收方需要定义对列,并将对列以某规则(Binding key)绑定到Exchange上,所以你要知道Exchange、Queue、Binding key。
接收方启用ACK如何回复
直接看代码
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitListener(queues = Constant.Q_TOPIC_MESSAGE_ALL)
@RabbitHandler
public void topicReceiver2(final Message message, final Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
try {
// Queue中的数据在message.getBody()里
} catch (Exception ex) {
} finally {
channel.basicAck(deliveryTag, false);
}
}
具体demo:参考git@github.com:spadekingdom/demo-mq.git