工作原理:生产者经过交换机向队列中发送消息,交换机会把消息发送到订阅了当前消息的队列中。
交换机实现的是动态的数据分发。分发的原则是谁订阅了我,我向谁发。
C1只能消费第一个对列中的内容。
C2只能消费第二个对列中的内容。
发布订阅模式只能做到单纯地进行发布,不能指定某一个队列进行发布。
[root@bogon rabbitmq-server-3.6.1]# cd /etc/rabbitmq
[root@bogon rabbitmq]# service rabbitmq-server start
Starting rabbitmq-server: SUCCESS
rabbitmq-server.er.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rabbit</groupId>
<artifactId>schoolmanage</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
</project>
package schoolmanage;
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class TestStudentMsgFanOut {
private Connection connection = null;
@Before
public void init() throws IOException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 为工厂对象添加数据
// 远程主机
factory.setHost("192.168.6.130");
// 端口号
factory.setPort(5672);
// 虚拟主机
factory.setVirtualHost("/school");
// 用户名
factory.setUsername("student");
// 密码
factory.setPassword("student");
// 创建连接
connection = factory.newConnection();
}
// 消息生产者
@Test
public void provider() throws IOException {
// 创建通道
Channel channel = connection.createChannel();
// 创建交换机
String exchangeName = "E1";
/**
* 定义交换机模式 String exchange String type fanout发布订阅 redirect路由模式 topic主题模式
*/
channel.exchangeDeclare(exchangeName, "fanout");
for (int i = 0; i < 6; i++) {
String msg = "今天学校下雨了!!!——发布订阅模式,这是第" + i + "次通知";
channel.basicPublish(exchangeName, "", null, msg.getBytes());
}
// 将流关闭
channel.close();
connection.close();
System.out.println("消息发送成功!!!");
}
// 消费者1
@Test
public void consumer1() throws Exception {
// 创建通道
Channel channel = connection.createChannel();
// 定义对列
String queueName = "counsumer1";
channel.queueDeclare(queueName, false, false, false, null);
// 订阅E1交换机的消息
String exchangeName = "E1";
// 定义交换机模式
channel.exchangeDeclare(exchangeName, "fanout");
// 将对列与交换机绑定
channel.queueBind(queueName, exchangeName, "");
// 只允许一次执行一个消息
channel.basicQos(1);
// 定义消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 将消费者与队列进行绑定
/**
* 定义回复方式 autoAck为false表示手动返回
*/
channel.basicConsume(queueName, false, consumer);
System.out.println("消费者1,启动。。。");
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = "消费者1收到 :" + new String(delivery.getBody());
System.out.println(msg);
// 告知rabbitMq当前消费的是哪一个消息
/**
* multiple false表示不扩展
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
// 消费者2
@Test
public void consumer2() throws Exception {
// 创建通道
Channel channel = connection.createChannel();
// 定义对列
String queueName = "counsumer2";
channel.queueDeclare(queueName, false, false, false, null);
// 订阅E1交换机的消息
String exchangeName = "E1";
// 定义交换机模式
channel.exchangeDeclare(exchangeName, "fanout");
// 将对列与交换机绑定
channel.queueBind(queueName, exchangeName, "");
// 只允许一次执行一个消息
channel.basicQos(1);
// 定义消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 将消费者与队列进行绑定
/**
* 定义回复方式 autoAck为false表示手动返回
*/
channel.basicConsume(queueName, false, consumer);
System.out.println("消费者2,启动。。。");
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = "消费者2收到 :" + new String(delivery.getBody());
System.out.println(msg);
// 告知rabbitMq当前消费的是哪一个消息
/**
* multiple false表示不扩展
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者1,启动。。。
消费者2,启动。。。
消息发送成功!!!
消费者1,启动。。。
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第0次通知
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第1次通知
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第2次通知
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第3次通知
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第4次通知
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第5次通知
消费者2,启动。。。
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第0次通知
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第1次通知
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第2次通知
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第3次通知
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第4次通知
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第5次通知