RabbitMQ(六)路由

RabbitMQ官网中文版教程:

http://rabbitmq.mr-ping.com/tutorials_with_python/[4]Routing.html

上述教程示例为pathon版,Java版及相应解释如下:

生产者

package com.xc.rabbit.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Created by xc.
 */
public class RoutingSendDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    // 路由关键字
    private static final String[] routingKeys = new String[] {"info", "warning", "error"};

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
        // 创建一个新的连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 发送消息
        for (String severity : routingKeys) {
            String message = "Send the message level : " + severity;
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }
        channel.close();
        connection.close();
    }
}

消费者1

package com.xc.rabbitmq.routing;

import com.rabbit.client.*;

import java.io.IOException;

/**
 * Created by xc.
 */
public class ReceiveLogsDirect1 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由关键字
    private static final String[] routingKey = new String[]{"info", "warning", "error"};

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
        // 创建一个新的连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 获取匿名队列名称
        String queueName = channel.queueDeclare().getQueue();
        // 根据路由关键字进行多重绑定
        for (String severity : routingKey) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
            System.out.println("ReceiveLogsDirect1 exchange : " + EXCHANGE_NAME +
                    ", queue : " + queueName + ", BindRoutingKey : " + severity);

        }
        System.out.println("ReceiveLogsDirect1 [*] Waiting for messages. To exit press CTRL + C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

消费者2

package com.xc.rabbit.routing;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Created by xc.
 */
public class ReceiveLogsDirect2 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由关键字
    private static final String[] routingKey = new String[]{"error"};

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
        // 创建一个新的连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 获取匿名队列名称
        String queueName = channel.queueDeclare().getQueue();
        // 根据路由关键字进行多重绑定
        for (String severity : routingKey) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
            System.out.println("ReceiveLogsDirect2 exchange : " + EXCHANGE_NAME +
                    ", queue : " + queueName + ", BindRoutingKey : " + severity);

        }
        System.out.println("ReceiveLogsDirect2 [*] Waiting for messages. To exit press CTRL + C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

运行结果如下:

由图可知,生产者发出的消息,根据不同的路由,发送到不同的队列,进而被不同的消费者接收。

先跑消费者程序,在跑生产者程序。否则,生产者的消息到达交换器之后,如果没有队列连上交换器, 则消息被直接丢弃。

注意:

  1. Bindings can take an extra routingKey parameter. To avoid the confusion with a basic_publish,parameter we're going to call it a binding key.
    binding key和routing key是一回事,为了避免概念重复,channel.queueBind时叫binding key, channel.basicPublish时叫routing key。

  2. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.
    direct交换器的路由规则很简单,消息会路由到binding key与routing key相同的队列。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,868评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,398评论 2 34
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,116评论 3 51
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,363评论 0 1
  • 注:这份文档是我和几个朋友学习后一起完成的。 目录 RabbitMQ 概念 exchange交换机机制什么是交换机...
    Mooner_guo阅读 33,340评论 8 97