RabbitMQ交换机详解

交换机概念

Exchange:交换机,接收消息,并根据路由键转发消息到绑定的队列
如图为官网提供的模型,蓝色框表示Send Message,Client端把消息投递到Exchange上,通过RoutingKey路由关系将消息路由到指定的队列,绿色框代表Receive Message,Client端和队列建立监听,然后去接收消息。红色框代表RabbitMQ Server,黄色框表示RoutingKey,即Exchange和Queue需要建立绑定关系。

交换机属性

  • Name: 交换机名称
  • Type: 交换机类型,direct、topic、 fanout、 headers
  • Durability: 是否需要持久化
  • Auto Delete: 当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
  • Internal: 当前Exchange是否用于RabbitMQ内部使用,默认为False
  • Arguments: 扩展参数,用于扩展AMQP协议定制化使用

Direct Exchange

直连方式,所有发送到 Direct Exchange 的消息被转发到RouteKey中指定的Queue
注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

看一下 Direct Exchange 的图解,其实意思就是说指定了RoutingKey的消息会被投递到绑定关系与该key值相同的队列上。

生产端

指定投递的Exchange和相应的RontingKey进行发送消息

public class Producer4DirectExchange {

    public static void main(String[] args) throws Exception {
        
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.43.157");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        
        //3 创建Channel
        Channel channel = connection.createChannel();  
        
        //4 声明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        
        //5 发送
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); 
        
        //6 关闭连接
        channel.close();
        connection.close();
    }   
}
消费端
  • 声明一个直连交换机
  • 声明队列
  • 建立交换机和队列的绑定关系
  • 消费者监听队列,消费消息
public class Consumer4DirectExchange {

    public static void main(String[] args) throws Exception {
        
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        connectionFactory.setHost("192.168.43.157");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        // 1.设置是否自动重连(网络闪断时)      2.每3秒重连一次
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        
        //表示声明了一个交换机
        //参数分别为:name,type,durable,autoDelete,internal,arguments
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        //参数分别为:name,durable,exclusive,autoDelete,arguments
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系(exchange和queue)
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //细节一:同一个队列可以绑定多个值
        channel.queueBind(queueName, exchangeName, "666");
        //细节二:不同队列可以绑定相同的RoutingKey
        String queueName2 = "test_direct_queue2";
        channel.queueDeclare(queueName2, false, false, false, null);
        channel.queueBind(queueName2, exchangeName, routingKey);
        
        //创建消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //监听多个队列
        channel.basicConsume(queueName2, true, consumer);
        // 循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}
运行说明

先启动消费端,刷新管控台,在Exchange目录下可以看到我们声明的exchange以及type

点击该exchange可以看到和队列的绑定关系

然后启动生产端,此时消费端控制台进行了打印,共消费了两条消息,说明监听的两个队列都接收到了消息。

收到消息:Hello World RabbitMQ 4  Direct Exchange Message ... 
收到消息:Hello World RabbitMQ 4  Direct Exchange Message ... 

当我们修改生产端的routingKey值为:666时,那么只有队列一接收到消息并被消费者消费。
如果修改值为:test.direct111,此时在启动生产端,消费端就收不到消息了,这就是直连的方式。

Topic Exchange

所有发送到 Topic Exchange 的消息被转发到所有关心RouteKey中指定Topic的Queue上
Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

注意:可以使用通配符进行模糊匹配

图解

看一下 Topic Exchange 的图解,意思是说我们有4个队列,它们的绑定关系分别是usa.##.news#.weathereurope.#
对于第一个队列而言,它只关系以usa.开头的相关消息。比如发送的第一条消息是usa.news,那么这条消息会同时匹配上队列1和队列2,所以两个队列都能接收到,其他消息也是同样的规则,这里就不继续展开说了。

生产端

指定投递的Exchange和相应的RontingKey进行发送消息

        //省略创建连接和通道...
        //4 声明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        
        //5 发送
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());    
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
        //关闭连接...
消费端

声明一个Topic Exchange,声明队列,建立交换机和队列的绑定关系

         //省略创建连接和通道...     
        //4 声明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey = "user.#";
        // 声明Topic Exchange
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 建立交换机和队列的绑定关系
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //创建消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        
        //循环获取消息... 
运行说明

先启动消费端,同样可以在管控台可以看到我们新声明的exchange以及它的绑定队列,这里不再细说。然后启动生产端,此时消费端控制台进行了打印,共消费了三条消息,说明三条消息都和队列指定的Topic匹配上了,因为使用的是 # 匹配

收到消息:Hello World RabbitMQ 4 Topic Exchange Message ...
收到消息:Hello World RabbitMQ 4 Topic Exchange Message ...
收到消息:Hello World RabbitMQ 4 Topic Exchange Message ...

将消费端指定的RoutingKey进行修改:routingKey = "user.*";
然后重新启动消费端,注意此时该队列绑定了两个RoutingKey,那么生产者无论匹配到哪个都可以将消息投递到该队列中。我们在管控台将原来的路由规则进行解绑,如图所示,点击Unbind按钮。

再次启动生产端,此时消费端控制台打印了两条消息,说明最后一条消息:user.delete.abc没有匹配上,因为使用的是 * 匹配,这就是 Topic Exchange 的路由方式。

Fanout Exchange

  • 不处理路由键,只需要简单的将队列绑定到交换机上
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
  • Fanout交换机转发消息是最快的(性能最好)

来看一下 Fanout Exchange 的图解,意思就是消息不走任何的路由规则,只有队列和交换机有绑定关系就能收到消息

生产端

不设置路由键直接发送消息到Fanout Exchange

        //省略创建连接和通道...
        //4 声明
        String exchangeName = "test_fanout_exchange";
        //5 发送
        for(int i = 0; i < 5; i ++) {
            String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            //不设置路由键或者设置任意内容均可
            channel.basicPublish(exchangeName, "", null , msg.getBytes());          
        }
        //关闭连接...
消费端

声明一个Fanout Exchange,声明队列,建立交换机和队列的绑定关系,绑定关系不使用RoutingKey

        //省略创建连接和通道...      
        //4 声明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = ""; //不设置路由键
        //声明交换机、队列、绑定关系
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //创建消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息... 

先启动消费端,然后启动生产端,消息被成功消费,这种就是Fanout Exchange,它不走任何的路由规则,直接将消息路由到所有与它绑定的队列。

收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...
收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...
收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...
收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...
收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...

还有一种交换机Headers Exchange,很少使用,是通过消息头进行路由的,通常我们都使用前三种。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容