一、前言
发布订阅模式,即producer发送者,发布一个消息,多个接收者都能获取到同样的消息。大致流程是,发送者将消息发送到指定的交换机,交换机将消息发布到绑定到该交换的所有队列里,接收者通过队列获取消息。
二、发送者Producer
1.FanoutExchangeConfig.java
新建交换机,并将队列A B C绑定到该交换机
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutExchangeConfig {
/**
* 新建交换机,并将队列A B C绑定到该交换机
*/
public static final String FANOUT_EXCHANGE = "weather_fanout_exchange";
public static final String FANOUT_EXCHANGE_QUEUE_A = "fanout_exchange_queue_a";
public static final String FANOUT_EXCHANGE_QUEUE_B = "fanout_exchange_queue_b";
public static final String FANOUT_EXCHANGE_QUEUE_C = "fanout_exchange_queue_c";
/**
* 创建A队列
*/
@Bean
public Queue queueA(){
return new Queue(FANOUT_EXCHANGE_QUEUE_A);
}
/**
* 创建B队列
*/
@Bean
public Queue queueB(){
return new Queue(FANOUT_EXCHANGE_QUEUE_B);
}
/**
* 创建C队列
*/
@Bean
public Queue queueC(){
return new Queue(FANOUT_EXCHANGE_QUEUE_C);
}
/**
* 创建fanout交换机
* @return
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
/**
* 将A队列绑定到交换机上
*/
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
/**
* 将B队列绑定到交换机上
*/
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
/**
* 将C队列绑定到交换机上
*/
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
2.发送消息到指定交换机
@RestController
@RequestMapping(value = "send")
public class ProducerController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("fanout-exchange")
public void sendExchange(){
String msg = "send msg to fanout exchange" + new Date().toString();
rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null, msg);
}
}
三、Consumer接收者
1.RabbitMqConfig.java
@Configuration
public class RabbitMqConfig {
public static final String FANOUT_EXCHANGE_QUEUE_A = "fanout_exchange_queue_a";
public static final String FANOUT_EXCHANGE_QUEUE_B = "fanout_exchange_queue_b";
public static final String FANOUT_EXCHANGE_QUEUE_C = "fanout_exchange_queue_c";
}
2.RabbitMqReceiver.java
@Component
public class RabbitMqReceiver {
private final static Logger logger = LoggerFactory.getLogger(RabbitMqReceiver.class);
@RabbitListener(queues = RabbitMqConfig.FANOUT_EXCHANGE_QUEUE_A)
public void receiverQueueA(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueA 接收到消息为:"+msg);
}
@RabbitListener(queues = RabbitMqConfig.FANOUT_EXCHANGE_QUEUE_B)
public void receiverQueueB(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueB 接收到消息为:"+msg);
}
@RabbitListener(queues = RabbitMqConfig.FANOUT_EXCHANGE_QUEUE_C)
public void receiverQueueC(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueC 接收到消息为:"+msg);
}
}
从日志可以看出来,每个队列都能收到相同的消息
以上!