RabbitMQ 官方demo实践笔记

最近在上云计算,上课讲到RabbitMQ,先试下水(重庆大学2017 刘 李 杨 谢)

Demo 0.环境配置:

  • 服务端(MQ服务)

下载安装erlang 22.3 ,下载安装 RabbitMQ 3.8,无需进一步操作

  • 客户端(代码)

本小组使用Java语言,基于Maven配置:

pom.xml

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>

此时Maven自动导入包,可以在项目中使用MQ了

Demo1.实现Hello World

  • 发送方,消息生产者:

核心代码

    //step 1
    ConnectionFactory factory = new ConnectionFactory();  
    //step 2
    factory.setHost("localhost");   
    //step 3
    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel()) {   
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            //step  4
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    }

核心逻辑

  1. 工厂模式产生一个channel作为通讯的工具
  2. 为其绑定主机号,localhost表示本地
  3. 制造一个channel,并在其中声明一个名叫 QUEUE_NAME 字符串值得队列
  4. 发布这个消息,这个消息位于消息队列中了
  • 接收方,消息的消费者:

核心代码

    //step 1
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    //step 2
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //step 3
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
     };
    //step 4
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

核心逻辑

  1. 同发送方,创建工厂和channel;
  2. channel声明队列,这个和发送方代码调用相同,但此时是找到已经创建的MQ而发送方是创建一个MQ
  3. 创建一个接收消息后的逻辑的回调,但是不在这里执行;
  4. 通过basicConsume循环接收消息,执行回调逻辑处理消息

Demo 2. 公平分摊,ACK,持久化

基于demo1的代码,产生如下问题:

  • 消息存在内存中,如果停止服务,重启丢失怎么办?

  • 如果有多个消费者,那么如何决定将消息发给谁?

  • 如何确保消费者处理完消息了呢? && 如何确保公平分摊?

demo2实现的工作队列将用于解决这些问题,具体如下

Q1:持久化问题:

一般情况下,发布消息:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("",QUEUE_NAME, 
                     null,
                     message.getBytes("UTF-8"));

如果需要将消息持久化:

channel.queueDeclare(TASK_QUEUE_NAME,
                     true, false, false, null);//第二个参数为是否持久化
channel.basicPublish("", QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
Q2.多消费者问题

如果有多个消费者:


image.png

默认情况下,会按一定的顺序轮流分配给不同的消费者;也就是说,多个消费者轮流消费队列里的消息

Q3. 确保处理完成&&公平分配

那么,每个消费者会设置一个channel.basicQos(n); 代表消费者能最大n个队列待处理;如果超过这个值,那么消息队列将不会往其中分配;

而MQ是不会主动监视消费者还有多少个消息没处理,通常,都是由消费者处理完了主动通知MQ:

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

消费者的这段代码告诉队列已经成功处理了消息,如果队列持久化了消息,这个时候就可以放心销毁了;否则消息会一直存在,那么也会造成开销甚至存储溢出;

Demo 3.exchange "fanout" 实现发布订阅

当发送方发送一条消息,订阅了该类型消息的所有消费者都会收到。

场景:

​ 用户上传了自己的头像,这个时候图片需要清除缓存,同时用户应该得到积分奖励,你可以把这两个队列绑定到图片上传的交换器上,这样当有第三个、第四个上传完图片需要处理的需求的时候,原来的代码可以不变,只需要添加一个订阅消息即可,这样发送方和消费者的代码完全解耦,并可以轻而易举的添加新功能了。

​ 原理在于,发送方不再直接通过消息队列发送消息,而是通过exchange,而消费者会通过在所在的exchange绑定自己的消息队列;当发送方在exchange发布的时候,exchange会把消息发送给所有的绑定在其上的消息队列发送消息。(下图X为exchange)


image.png
  • 发送方:

核心代码

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = argv.length < 1 ? "info: Hello World!" :
                    String.join(" ", argv);

            channel.basicPublish(EXCHANGE_NAME, "", null, 
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");

核心逻辑

相比之前的demo,发送方不再通过channel.queueDeclare()声明一个queue,取而代之的是声明一个exchange,fannout是广播的模式,还有其他类型:direct topic等:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  //"fanout"模式代表广播发送
  • 接收方:

核心代码

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        //...
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Consumer Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    

核心逻辑

同样,接收方也通过exchangedeclare连接到exchange;这个时候通过channel会为其分配一个随机命名的MQ,并通过queueBind()绑定到exchange,该MQ为了在最后一行basicConsume()中获取的这个MQ执行回调函数,通过getQueue() 获取到该随机队列名,随后正常消费队列消息

Demo 4. exchange "direct" 选择性发布订阅

对于demo3, exchange的fanout将发送方每一个消息都发送给所有的消费者,这是一种无差别的分配,但有的时候是需要合理过滤的:

场景:
​ 一个日志产生器会产生日志,而多个消费者会消费日志,但是有的消费者只会对【warn】级别的感兴趣,而有的只负责处理【info】【error】级别的日志;这个情况下,如果使用fanout,消费者就会收到自己不感兴趣的消息,exchange对这些消费者不感兴趣的MQ根本没必要发送消息。


image.png
  • 发送方:发送 info warn error三种类型的日志

核心代码:

            //前面照常创建channel、 factory 
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            Scanner sc=new Scanner(System.in);
            while(sc.hasNext()){
                String severity = sc.nextLine();   //日志级别
                String message = severity+" message";

                channel.basicPublish(EXCHANGE_NAME,
                        severity,                  //发往对此级别感兴趣的管道
                        null, message.getBytes("UTF-8"));
            }
        }

核心逻辑:

  1. 声明exchange的时候指定类型"direct"
  2. 输入感兴趣的级别 warn error info等
  3. 发往对应感兴趣的MQ


    image.png
  • 接收方:选择性接收自己感兴趣的log类型

核心代码

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();

        String [] interestLog=new String[]{"info"};//对info类型感兴趣
        for (String severity : interestLog) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }

核心逻辑

  1. 同样声明,找到同名exchange,类型也是“direct”

  2. 根据感兴趣类型,绑定对应的队列(多个感兴趣事件就绑定多个MQ)

  3. 设置future回调函数处理感兴趣的事件:

对info感兴趣的消费者,只能收到Info:

image.png

对 error,warn感兴趣的消费者,只能收到error 和 warn

image.png

Demo. 5 exchange "Topic"更灵活的匹配发布订阅

​ 尽管使用direct类型的exchange对fanout进行了改进,但它仍然存在局限性:它不能基于多个条件进行路由。

场景:

​ 日志系统并不只会产生[warn info error]等级信息,还会涉及作者[user admin guest],以及别的信息,这多个属性往往是不相干的,如果用direct,会有很大的限制

Topic 类型的exchange使用 xx.xx.xx.xx的点段式路由key对管道进行划分,发布的消息的key为该格式的,而消费者则只需要care其中某几个自己感兴趣的字段,别的通过*****进行忽略某一段,或者使用#进行模糊匹配:

例如发布了一个“cn.mq.rabbit.error”的消息;

  • 能匹配上的路由键:cn.mq.rabbit.* ; cn.mq.rabbit.#;#.error ; cn.mq.#;#

  • 不能匹配上的路由键:cn.mq..error;*

原理图如下,可以看到一个消费者可以匹配多个topic:


image.png

代码上:

  • 发送方

和DEMO4 基本结构相同,只是对exchange声明为“topic”类型:

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

此外就是发布的时候routingKey变成了点段式字符串:

channel.basicPublish(EXCHANGE_NAME,
            routingKey, //如“a.b.c”    “error.admin.2020”
            null, Rmessage.getBytes("UTF-8"));              

接收方发布消息:


image.png
  • 接收方

接收方同样相比demo4只有两处改动:

topic类型的exchange声明

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

匹配的bindKey变成了点段式字符串

        String [] keys=new String[]{"*.admin.*"  ,  "#.guest.#"};
        for (String bindingKey : keys) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

对于上述发送方的输入: 对第二个字段为“admin”和“guset”感兴趣的接收方收到:

image.png

对第一个字段为“Error”感兴趣的接收方收到:

image.png

Topic实现了相比direct更灵活的匹配选择性订阅;

Demo .6 RPC远程调用

RPC远程调用是MQ的一个用法示例,其基本原理还是在于字符串消息的传递,在官方示例中,请求服务方只负责通过RequestQueue简单发送一个数字字符串;

而RPC服务方收到RequestQueue字符串之后进行斐波拉契数列的调用,随后通过ReplyQueue返回结果;

image.png
  • 如何精准返回?

我们知道,通常服务端只有几个实例,负责响应所有的调用请求,客户端数量远远大于服务端;所以,通常都是多个客户端用一个requestQueue,而自己和服务端之间独享一个replyQueue;

所以客户端在发送请求的时候,会通过设置property中的属性来告诉服务方将返回结果送往自己所在的replyQueue:

客户端

        String replyQueueName =
            channel.queueDeclare().getQueue();//和服务方建立一个随机名字的MQ
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)//告诉服务方返回结果的队列
                .build();

同时我们也看到通过在prop中设置了一个随机的id,可以在接收的时候验证,确保返回来的结果就是当初发出去的:

        String ctag = channel.basicConsume(replyQueueName
                , true, (consumerTag, delivery) -> {
            //equals函数确保发送的和接收的结果的id相同
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                //如果reply有返回值了,那么就往response阻塞队列放置
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }java
        }, consumerTag -> {
        });
        String result = response.take();//阻塞获取RPC结果

通过将response初始化为阻塞队列,当结果没有返回的时候,就会阻塞。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。