
蓝色的框:指的是生产者将消息投递到EXchange上,然后根据routingkey路由到指定队列上
绿色框:消费者监听队列,然后接受消息。
黄色框:消息到达了exchange是路由到哪个队列,要根据routingkey而定。
下面讲解Exchange
Exchange的属性:(大致有个印象就OK,继续往下看)
1.Name:交换机名称;
2.Type:交换机类型 direct,topic,fanout,headers;
3.Durability:是否需要持久化,true为持久化,代表交换机在服务器重启后是否还存在;
4.Auto Delete :当最后一个绑定到exchange上的队列删除后,自动删除该exchange.
5.Internal:当前的exchange是否用于rabbitmq内部使用,默认为false.
6.Arguments:扩展参数,用于扩展AMQP协议自制化。
Exchange类型以及讲解
1.Direct exchange


生产者代码:
public class Producer4DirectExchange {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
//5 发送
String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
}
消费者的代码:
此时需要channel发送消息指定的routingkey和绑定exchange和队列时候的routingkey相同,直接路由到这些队列上。
如果不指定exchangeType,那么就是default Exchange,此时不需要将队列绑定到exchange.但是Routekey需要完全匹配。
public class Consumer4DirectExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//表示声明了一个交换机
//参数说明:Exchange的属性:
//1.Name:交换机名称;
//2.Type:交换机类型 direct,topic,fanout,headers;
//3.Durability:是否需要持久化,true为持久化;
//4.Auto Delete :当最后一个绑定到exchange上的队列删除后,自动删除该exchange.
//5.Internal:当前的exchange是否用于rabbitmq内部使用,默认为false.
//Arguments:扩展参数,用于扩展AMQP协议自制化
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示声明了一个队列
//参数说明:
//1.消息队列:实际存储消息数据
//2.Durability:是否持久化,3.auto_delete:如果选yes,代表最后一个监听被移除之后,该队列会自动被删除。
channel.queueDeclare(queueName, false, false, false, null);
//建立一个绑定关系:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
2.topic exchange
此方式是算是routingkey的通配符匹配模式,两张图片说明问题
符号“#” 匹配一个或多个词
符号“”匹配不多不少一个词
例如:“log.#”能够匹配到“log.info.oa”
"log."只会匹配到“log.erro”
如下图

生产者代码(用了三种routingkey发送)
public class Producer4TopicExchange {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 发送
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}
消费者代码:
public class Consumer4TopicExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
//String routingKey = "user.*";
String routingKey = "user.*";
// 1 声明交换机
channel.exchangeDeclare(exchangeName, exch angeType, true, false, false, null);
// 2 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 3 建立交换机和队列的绑定关系:
//绑定关系中指定routingkey
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
3.Fanout Exchange(不需要routingkey,只需要绑定)

特点是:
1.不处理路由键,只需要简单的将队列绑定到交换机上
2.发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
3.Fanout交换机转发消息是最快的
生产者代码:
public class Producer4FanoutExchange {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_fanout_exchange";
//5 发送
for(int i = 0; i < 10; i ++) {
String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
channel.basicPublish(exchangeName, "", null , msg.getBytes());
}
channel.close();
connection.close();
}
}
消费者代码:
public class Consumer4FanoutExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = ""; //不设置路由键
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}