[用官方文档学习RabbitMQ]——2.RabbitMQ的工作(任务)模式模式——WorkQueue

在第一期里面我们写了个程序,给一个队列命名,并通过这个队列发送、接受消息。在这一期我们将会创建一个Work Queue用于在多个工作之间分配任务。(最后这句实在不知道咋翻译,看不懂的同学自行去官网查阅)

简介

工作队列(Work Queues)也叫任务队列(Task Queues),主要思想是避免立即去执行资源密集型的任务,同时还要等待它的完成。相反的,我们的计划是稍后再完成任务。我们会将任务封装为消息,发送到队列中去。一个在后台运行的工作进程将会Pop(弹出)任务,最终会执行任务。当你运行很多工作的时候,任务将会在他们之间共享。

注意:这个概念在Web应用程序中是很有用的,原因是:一个简短的HTTP request window(请求窗口),是无法处理复杂的请求的。

准备

之前第一期,我们发送了一个包含着“Hello World”的消息。这一期,我们将发送一条用于复杂任务的字符串。我们手头上没有一个真实确切的任务,比如“调整图片的大小”、“渲染pdf文件”,之类的,所以让我们来假装我们很忙(官方也够萌)——通过使用thread.sleep()函数把我们伪装成很忙的样子。我们将会以字符串中“.”的数量作为它的复杂性;每个"."将会占用一秒的时间。比如说,“Hello..."需要三秒去处理。

在这里官方没有写工具类,我建议写一个工具类,以便于自定义连接:

ConnectionUtil.java
public class ConnectionUtil {
    public static Connection getConnection() throws IOException {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);
        //填充账户信息
        factory.setVirtualHost("{你自定的vhost}");
        factory.setUsername("{你的用户名}");
        factory.setPassword("{你的密码}");
        //获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

接下来...

我们将会稍微修改一下之前的Sending程序,让它能从命令行发送任意消息。这个程序将能够把任务安排到我们的工作队列中,所以我们把它命名为NewTask.java

官网只贴了一部分,我很贴心给你们完整的!快夸我(☆▽☆)

NewTask.java
public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //从连接中创建通道
        Channel channel = connection.createChannel();
        //创建队列
        boolean durable = true;
        //创建队列
        channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
        //待传递的消息内容
        String message = getMessage(args);
        //传送
        channel.basicPublish("","hello",null,message.getBytes());
        System.out.println("[x] Sent '"+message+"'");
        //关闭连接通道
        channel.close();
        connection.close();

    }
    private static String getMessage(String[] strings) {
        if (strings.length<1){
            return "hello world";
        }
        return joinStrings(strings," ");
    }
    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0){
            return "";
        }
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1;i < length; i++){
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

(敲黑板!!!)getMessage和joinStrings两个方法是帮我们从命令行参数(args)获取消息的~

我们旧的Receiving.java也需要一些改变,它需要接受到消息,并从消息中发现".",每个"."会为我们伪造一秒钟的工作。它将处理传递的消息,并执行任务。所以我们另写一个类,给他起名为Worker.java

Worker.java
public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";


    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //连接获取通道
        Channel channel = connection.createChannel();
        boolean durable = true;
        //创建队列
        channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    doWork(message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[x] Done");
                }
            }
        };
        
        boolean autoAck = true;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);
    }

    private static void doWork(String task) throws InterruptedException {
        for (char ch:task.toCharArray()){
            if (ch=='.'){
                Thread.sleep(1000);
            }
        }
    }

}

基本上没有太多的变化,加了个doWork方法,这个方法是我们用来假装我们很忙,模拟执行时间的方法,不明白自己看下代码就知道它是干啥的了~

接下来就给大家看看结果!

第一期的程序没给大家演示结果,不过那个很简单,自己可以随便玩,试试先看生产者后开消费者,或者把顺序调过来,总之程序在这,具体心得每个人都不同~第一期就不演示啦。第二期需要说一下里面的玄机,所以给大家看看这里的效果。

结果以及思考

我的编译器是主流的IDEA,我们可以开启两个或多个Worker准备接收。这里我开两个。

根据咱们上面的代码,NewTask的main方法需要有入参才能看得到效果。具体方式是:发送和接受都运行一下,确保简单的消息能发送过去,默认没有入参是发送一条hello world,先证明自己的程序是没问题的。然后点击NewTask程序的Edit Configurations然后如图:

设置main函数的入参

看哪个Program argument 对!就是它!我们可以依次设置

"message."、"message.."、"message..."、"message...."、"message....."

记得前面说过一个"."会占用一秒钟

我们去看看,两个Worker都接收到了什么。

第一个:

第一个worker

第二个:


第二个Worker

请忽略"Done",谢谢 ㄟ( ▔, ▔ )ㄏ

Round-Robin Dispatching 循环调度

默认情况下,RabbitMQ 会有顺序的向消费者分发每个消息。当收到ack后,会将该消息删除,然后将下一个消息分发到下一个消费者。平均每个消费者将会得到相同数量的消息。这种分发的方式叫做round-robin——循环调度

使用Work Queue工作队列的优点之一就是能够轻松的并行工作。如果我们积累了大量的工作,我们可以增加更多的Worker,这样我们就能够轻松的扩大规模。

听起来是不是挺高大上的,心里想着这样就一定没问题了吧~~~但是!!!(敲黑板)这样的分发模式是存在问题的!具体问题一会儿再说

Message acknowledgment 消息确认

设计一个情景:一个消费者开始这个很长的任务,并且只完成了一部分,会发生什么?使用当前的代码,一旦RabbitMQ将消息传递给了客户,它将会立即从内存中删掉这个消息。这样的前提下,我们突然结束掉这个worker,那么这条信息处理的那部分的任务就消失了,而且更扎心的是,发送给这个worker的所有未被处理的信息也都不见了。悲剧啊(@ο@) !!!

我们不想失去任何任务,如果一个worker倒下了,我们希望把他的任务交给其他worker。

为了确保消息不丢失,RabbitMQ支持消息确认——Message acknowledgment,一个ACK会被消费者发回给RabbitMQ,该消息被接收、处理,这样RabbitMQ就可以自由的删除它了。如果一个消费者死亡了(通道关闭啦、连接关闭啦、TCP连接丢了啊...),这样它就无法发送ACK,RabbitMQ将会知道:信息没有完全被处理,需要重新排队!如果有其他的消费者存在,他会很快的把这条消息转移给另外一个消费者。这样就不用担心信息丢失的问题了。

没有超时这么一说,RabbitMQ会在消费者死亡时重新传递消息,即使处理这个消息将会占用很久很久很久...消息确认是在默认情况下开启的,例子里面我们用autoACK=true(自动发送ack)的方式把它标记关闭了,现在我们应该给它设置为false了。一旦完成了任务,就发送ack向worker确认一下。

 boolean autoAck = true;
 channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);

这里官方还特意的提醒我们不要忘记basicACK

被遗忘的ACK(有道的翻译我服)

错过basicAck是个很常见的错误。这个错误虽然简单,但是后果很严重。当您的客户端退出(可能一些随机的重启)时,消息将会被重新发送,但是RabbitMQ将会消耗越来越多的内存,因为它将会无法释放未被发送的任何消息。

消息的持久性

好了!今天的重头戏来啦!这里说一下大家关注的消息持久化的问题!

我们已经学会了如何在消费者死亡的情况下,保证任务不丢失。但是,如果RabbitMQ的服务器停止了,我们的任务还是会丢失。当RabbitMQ退出或崩溃的时候,我们会失去所有的队列和消息,除非我们告诉它:”你给我把他们记住!“那我们怎么通知他呢?

我们需要确认两件事:

1.我们要将队列标记为持久化

boolean durable = true;
//创建队列
channel.queueDeclare(”hello“,durable,false,false,null);

这里注意一下hello这个队列名字,已经被定义了,而且它不是持久的。RabbitMQ不允许重新定义具有不同参数的现有队列,而且还会把error返回给执行这个操作的程序。关闭可以解决,更快的方式是另外起个名字,比如”task_queue“

boolean durable = true;
//创建队列
channel.queueDeclare(”task_queue“,durable,false,false,null);

这个名字更改了,所以我们应该将生产者和消费者代码中的这个部分全部都修改为一致的。

2.我们要将消息标记为持久化

这个时候,我们确信及时RabbitMQ重新启动了,task_queue队列也不会丢失了。现在我们要将消息标记为持久型的——通过设置MessageProperties(实现BasicProperties)的value为PERSISTENT_TEXT_PLAIN

channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());

这里官方又一次给了贴心的提示:

Note on 消息持久性

将消息标记为持久化,并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并没有保存它的时候,仍然有短暂的时间间隙。而且RabbitMQ对每条消息都不执行fsync函数(具体这玩意是啥,我也是查百度才知道的)。——它只可能保存到缓存中,而不是真正的写到磁盘里。持久化虽然有,但是并不是很强,但是对于简单的任务队列来说已经足够。如果我们需要更强的保证,那么可以使用publiser confirm(发布者确认)

Fair dispatch 公平分配

可能你会注意到一件事情,虽然这个模式已经解决了很多的问题了,但是还是感觉好像缺了点什么。设计一个情景:公司有两名员工,我们有很多的任务,这些任务有的很沉重,有的很轻松,A员工很倒霉,每次都被分到很沉重的任务,B员工恰巧每次都会被分到十分轻松的任务。这时候我们所谓的公平的分配机制就显得没那么公平了。因为每当消息进入队列的时候,RabbitMQ才会发送一条消息,它不会考虑到消费者未确认消息的数量。这样会导致A员工积攒了很多工作,而B员工没什么事情可做。

公平分配

为了解决这个问题,我们可以使用basicQos方法参数传1.这会告诉RabbitMQ一次不给费者一个以上的信息。换句话说,在处理并确认前面的消息之前,不要向员工发送新消息。相反,它会把消息分配给一个不太忙的消费者,比如之前的员工B。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

来自官方贴心友情提示:

Note about 队列大小

如果所有的消费者都很忙,那么你的队伍就会排的很满很长。这时候应该注意一下这个问题,比如增加更多的消费者,或者一些其他策略

好的,至此第二期翻译完啦。我自己也不是很熟悉,边学习边翻译边整理有错误或者不通的地方希望大家多多指正

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,353评论 2 34
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 场景2:单发送多接收 使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,...
    katkrazy阅读 5,952评论 0 1
  • 继续翻译第一次尝试进行这样模式的学习,感觉好难进行,不过还是要坚持住! 简介 在之前的教程中,我们创建了一个工作队...
    AceCream佳阅读 1,298评论 0 3
  • 昨夜,莫名其妙地删了一个人,今早莫名其妙地被一人删了。 人就是这样,莫名其妙,做不到波澜不惊。 有些人好或者坏,如...
    你背后的人阅读 118评论 0 0