一、前言
路由模式,会用到Direct类型的交换机,生产者将消息发送给Direct交换机,队列绑定到该交换机,同时队列会指定接收某一个路由key的消息或者某多个路由key的消息。举个例子,生产者将info、waring、error三类日志消息发送到MQ,有两个消费者需要消费处理这些日志,其中一个消费者只消费info和waring这两类,另外一个消费者只消费error这类消息,这个场景下,路由模式就比较符合场景。
路由模式.png
二、生产者
1.DirectExchangeConfig.java
@Configuration
public class DirectExchangeConfig {
public static final String DIRECT_EXCHANGE = "direct_exchange";
public static final String QUEUE_INFO_WARING = "queue_info_waring";
public static final String QUEUE_ERROR = "queue_error";
public static final String ROUTING_KEY_INFO = "info_log";
public static final String ROUTING_KEY_WARNING = "waring_log";
public static final String ROUTING_KEY_ERROR = "error_log";
/**
* 定义一个direct类型的交换机
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE);
}
/**
* 定位一个只处理info和waring日志的队列1
*/
@Bean
public Queue queueForInfoWaringLog(){
return new Queue(QUEUE_INFO_WARING);
}
/**
* 定位一个只处理error日志的队列2
*/
@Bean
public Queue queueForInfoError(){
return new Queue(QUEUE_ERROR);
}
/**
* 将路由为info的消息绑定到队列1
*/
@Bean
public Binding bindingInfoQueueToDirect(){
return BindingBuilder.bind(queueForInfoWaringLog()).to(directExchange()).with(ROUTING_KEY_INFO);
}
/**
* 将路由为waring的消息绑定到队列1
*/
@Bean
public Binding bindingWaringQueueToDirect(){
return BindingBuilder.bind(queueForInfoWaringLog()).to(directExchange()).with(ROUTING_KEY_WARNING);
}
/**
* 将路由为waring的消息绑定到队列2
*/
@Bean
public Binding bindingErrorQueueToDirect(){
return BindingBuilder.bind(queueForInfoError()).to(directExchange()).with(ROUTING_KEY_ERROR);
}
}
2.定义controller发送消息
@RestController
@RequestMapping(value = "send")
public class ProducerController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("direct/info")
public void sendInfoLog(){
String msg = "[info] " + new Date().toString();
rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.ROUTING_KEY_INFO, msg);
}
@GetMapping("direct/waring")
public void sendWaringLog(){
String msg = "[waring] " + new Date().toString();
rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.ROUTING_KEY_WARNING, msg);
}
@GetMapping("direct/error")
public void sendErrorLog(){
String msg = "[error] " + new Date().toString();
rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.ROUTING_KEY_ERROR, msg);
}
}
二、生产者
1.RabbitMqReceiver.java
@Component
public class RabbitMqReceiver {
@RabbitListener(queues = RabbitMqConfig.QUEUE_INFO_WARING)
public void receiverQueueInfoWaring(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueInfoWaring 接收到消息为:"+msg);
}
@RabbitListener(queues = RabbitMqConfig.QUEUE_ERROR)
public void receiverQueueError(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueError 接收到消息为:"+msg);
}
}