RabbitMQ官方教程2--Work queues

说明

      在第一个教程中,我们编写了程序来从已命名的队列中接收和发送消息。在本教程中我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务。

      工作队列(即任务队列)的主要思想是避免立即执行资源密集型任务并且还不得不等待它完成。作为替代我们将任务安排在稍后完成。将任务压缩为消息并发送到队列中,后台运行的辅助进程将获得任务并最终执行任务。当运行了多个工作者时,这个任务将在他们之间共享。
      这个概念在web应用程序中特别有用,因为在短时间的HTTP请求窗口中不可能处理复杂的任务。

准备

      现在我们将发送字符串来代替复杂的任务。由于我们没有现实世界中的例子,比如调整图片的大小或者渲染PDF文件,所以通过使用Thread.slaeep()方法来假装自己很忙。令字符串中点的数量来表示任务的复杂度,每个点表示一秒钟,比如Hello...表示这个任务需要花费三秒钟。

      我们将稍微修改一下第一个教程中的Send.java的代码以允许命令行发送任意消息。这个程序将把任务调度到我们的工作队列中,因此我们将它命名为NewTask.java:

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

      之前的Recv.java程序也需要进行一些修改:他需要将消息中的每个点伪造成一秒钟的工作时间。它将处理传递的消息并执行任务,因此我们将其称为Worker.java:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

模拟执行时间的假任务:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

轮询调度

      使用任务队列的优点之一就是可以轻松的并行化工作。如果我们正在积压工作,我们只需要增加更多的工作者就可以很容易的扩大规模。
      首先尝试一下在同一时间运行两个工作者的实例,他们都会接收到队列中的消息,但是究竟是怎么样呢?让我们看一看:你需要启动三个控制台,两个运行工作程序(Worker.java),一个运行发送任务的程序(NewTask.java)

# shell 3 -- 发送端
java -cp $CP NewTask First message.
# => [x] 发送 '第一条消息'
java -cp $CP NewTask Second message..
# => [x] 发送 '第二条消息..'
java -cp $CP NewTask Third message...
# => [x] 发送 '第三条消息...'
java -cp $CP NewTask Fourth message....
# => [x] 发送 '第四条消息....'
java -cp $CP NewTask Fifth message.....
# => [x] 发送 '第五条消息.....'

接收端:

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

      默认情况下,RabbitMQ会将每个消息依次发送给下一个消费者。平均而言每一个消费者将收到相同数量的消息。这种分发消息的方式称为轮询(round-robin)

消息确认

      执行任务可能需要几秒钟的时间。如果一个消费者开始了一项漫长的任务并且仅完成了部分就死掉了,你一定想知道发生了什么。在我们当前的代码中,一旦RabbitMQ向消费者传递了一条消息,它就会立即将其标记为删除。在这种情况下,如果杀死一个工作者,就丢失它正在处理的消息,还将丢失所有发送给这个特定工作者的但尚未处理的消息。但是我们并不想失去任何的任务,如果一个工作者挂了我们希望将任务分配给另一个工作者。

      为了保证消息永不丢失,RabbitMQ支持消息确认机制。消费者发送回确认信息,告诉RabbitMQ已接收并处理了特定消息,RabbitMQ可以自由删除该消息。如果消费者挂掉了(可能是通道被关闭了,连接被关闭了或者TCP连接丢失了等等)没有发送回确认消息即ack,那么RabbitMQ就会知道这条消息没有被完全处理,就会将其重新放入队列中。如果在同一时间有其他的消费者在线那么它将很快被分配给其他消费者。这样就能确保即使有工作者挂掉了也没有消息丢失。

      如果没有任何消息超时,当消费者死亡时,RabbitMQ将重新发送消息。即使处理一条消息花费非常非常长的时间也没有关系。

      默认情况下手动消息确认机制处于打开状态。在上面的例子中我们通过变量autoAck=true显式关闭了它们,现在是时候将该标志设置为false并在完成任务后发送适当的确认。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

      使用此代码可以确保,即使你在工作者处理消息时使用CTRL+C杀掉他也不会有任何的消息丢失。在工作者挂掉后不久所有未确认的消息都会重新发送。【注意】发送确认消息必须使用和接收消息一样的通道,否则将导致channel-level协议异常。

【被遗忘的消息确认】
        忘记basicAck是一个常见的错误。虽然错误很简单,但是后果很严重。当消费者客户端退出时消息将被重新发送(可能看起来像是随机重新发送),RabbitMQ就会消耗越来越多的内存,因为它一直无法释放未确认的消息。调试这种错误可以使用rabbitmqctl打印 messages_unacknowledged字段:
        Linux:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
        Windows:rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化

      虽然可以保证即使消费者死亡消息也不会丢失,但是如果RabbitMQ的服务宕机那么任务还是会丢失。除非你告诉RabbitMQ不要那样,否则它在退出或崩溃前将会忘记所有的队列和消息。要确保消息不丢失需要保证两件事:队列和消息都要标记为持久化。

  • 首先,确保在RabbitMQ重启后该队列仍然存在。为此需要将其声明为持久化的:
boolean durable = true;
channel.queueDeclare("ask_queue", durable, false, false, null);
  • 然后,通过设置MessageProperties(实现了接口BasicProperties)的值为PERSISTENT_TEXT_PLAIN确保消息是持久化的:
import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

\color{red}{【有关消息持久性的说明】}
        将消息标记为持久化的并不能完全保证其不会丢失。虽然告诉了RabbitMQ要将消息保存到磁盘,但是当RabbitMQ接受了一条消息并且还没有保存它时,仍然会有一个短时间的窗口期。并且,RabbitMQ不会为每个消息都执行fsync(2)--它可能仅仅是将消息存储在缓存中,没有真正的写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果您需要更强的保证,那么您可以使用publisher confirm

公平调度

      你可能已经注意到了,调度仍然无法完全按照我们的要求进行。比如在有两名工人的情况下,当所有奇数消息都很重而偶数消息很轻时,就会导致一个工作者将一直很忙,而另一个工作者将几乎不做任何工作。然而,RabbitMQ对此一无所知,并且任然平均分配消息。

      发生这种情况是因为RabbitMQ在消息进入队列时才调度消息。它不会查看使用者的未确认消息数量,只是盲目的将第n条消息发送给第n个使用者。

      为了解决这一问题,我们可以使用basicQos方法并设置prefetchCount = 1。这告诉RabbitMQ一次不要给工作者一个以上的消息。换句话说,就是在工作者处理并确认上一条消息之前不要再发新的消息给他,而是将消息分配给下一个不忙的工作者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

\color{red}{【关于队列大小的注意事项】}
        如果所有的工作者都处于忙碌状态,那么你的队列就满了。需要注意这一点,或许可以添加工作者或者其他的一些策略。

完整的代码

发送方

NewTask.java : http://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/NewTask.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = String.join(" ", argv);

        channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }

}

接收方

Worker.java : http://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Worker.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,110评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,443评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,474评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,881评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,902评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,698评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,418评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,332评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,796评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,968评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,110评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,792评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,455评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,003评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,130评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,348评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,047评论 2 355