RabbitMQ Exchange类型

image.png

蓝色的框:指的是生产者将消息投递到EXchange上,然后根据routingkey路由到指定队列上
绿色框:消费者监听队列,然后接受消息。
黄色框:消息到达了exchange是路由到哪个队列,要根据routingkey而定。

下面讲解Exchange
Exchange的属性:(大致有个印象就OK,继续往下看)
1.Name:交换机名称;
2.Type:交换机类型 direct,topic,fanout,headers;
3.Durability:是否需要持久化,true为持久化,代表交换机在服务器重启后是否还存在;
4.Auto Delete :当最后一个绑定到exchange上的队列删除后,自动删除该exchange.
5.Internal:当前的exchange是否用于rabbitmq内部使用,默认为false.
6.Arguments:扩展参数,用于扩展AMQP协议自制化。

Exchange类型以及讲解

1.Direct exchange

image.png
image.png

生产者代码:

public class Producer4DirectExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        //5 发送
        
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());         
        
    }
    
}

消费者的代码:
此时需要channel发送消息指定的routingkey和绑定exchange和队列时候的routingkey相同,直接路由到这些队列上。
如果不指定exchangeType,那么就是default Exchange,此时不需要将队列绑定到exchange.但是Routekey需要完全匹配。

public class Consumer4DirectExchange {

    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        
        //表示声明了一个交换机
        //参数说明:Exchange的属性:
        //1.Name:交换机名称;
        //2.Type:交换机类型 direct,topic,fanout,headers;
        //3.Durability:是否需要持久化,true为持久化;
        //4.Auto Delete :当最后一个绑定到exchange上的队列删除后,自动删除该exchange.
        //5.Internal:当前的exchange是否用于rabbitmq内部使用,默认为false.
        //Arguments:扩展参数,用于扩展AMQP协议自制化
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        
        //参数说明:
        //1.消息队列:实际存储消息数据
        //2.Durability:是否持久化,3.auto_delete:如果选yes,代表最后一个监听被移除之后,该队列会自动被删除。
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}

2.topic exchange

此方式是算是routingkey的通配符匹配模式,两张图片说明问题

符号“#” 匹配一个或多个词
符号“”匹配不多不少一个词
例如:“log.#”能够匹配到“log.info.oa”
"log.
"只会匹配到“log.erro”
如下图

image.png

生产者代码(用了三种routingkey发送)

public class Producer4TopicExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 发送
        
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());    
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
        channel.close();  
        connection.close();  
    }
    
}

消费者代码:

public class Consumer4TopicExchange {

    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        //String routingKey = "user.*";
        String routingKey = "user.*";
        // 1 声明交换机 
        channel.exchangeDeclare(exchangeName, exch angeType, true, false, false, null);
        // 2 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 3 建立交换机和队列的绑定关系:
        //绑定关系中指定routingkey
        channel.queueBind(queueName, exchangeName, routingKey);
         
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}

3.Fanout Exchange(不需要routingkey,只需要绑定)

image.png

特点是:
1.不处理路由键,只需要简单的将队列绑定到交换机上
2.发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
3.Fanout交换机转发消息是最快的

生产者代码:

public class Producer4FanoutExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_fanout_exchange";
        //5 发送
        for(int i = 0; i < 10; i ++) {
            String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());          
        }
        channel.close();  
        connection.close();  
    }
    
}

消费者代码:

public class Consumer4FanoutExchange {

    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = ""; //不设置路由键
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer); 
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容