上篇讲过简单的hello消息,这篇我们将实现一个可以在多个Consumer上发送持久化消息的work queue。
work queue又称为task queue,其主要作用是避免立即执行资源密集型任务,而不必等待完成。相反的,我们可以安排后续完成的任务。我们将任务封装成消息,并将其发送到队列。在后台运行的work进程将弹出任务并最终执行作业,当你运行多个worker时,他们之间还能分担处理任务。
worker:相当于一个Consumer
work queue在某些web应用程序中特别有用,如在短时间HTTP请求窗口中无法处理的复杂任务。
work queue
在这个样例中,由于不是真正的业务场景,所以不能模拟标准的复杂业务,所以我们用time.sleep函数来模拟时间的消耗,我们统计出一个string中"."的个数,用它来模式程序处理业务消耗的时间,每个"."消耗1s。如:“初级赛亚人...”,消耗3s
对于send.go,稍作修改,以允许程序可以从命令行发送任意消息,该消息将被发送至work queue
receive.go也做部分修改,以实现模拟复杂任务的处理耗时操作
现在重新build两个工程,然后在命令端运行:
Round-robin dispatching
使用任务队列的优点之一是能够轻松地并行工作。 如果我们正在建立一个积压的工作,我们可以添加更多的worker程序,这样可以很容易地扩展。
如果我们同时运行两个consumer从一个queue中获取消息,它们会怎样工作呢?来看看它们的运行机制。
看下运行结果:
producer
Consumer1
Consumer2
Message acknowledgment
完成一个任务会消耗一定的时间,考虑一个问题,如果一个consumer开始了一个长时任务,在这个任务完成一部分还未完全完成时,这个consumer就挂掉了,那这个消息就是没处理完成。在目前我们的代码中,rabbitMQ分发一个message到consumer中,就会立即标记这个消息已完成并将它从queue中删除掉,在这种情况下,如果一个worker没处理完一个message就挂掉了那么这个消息将会丢失。
但是我们并不想丢失任何消息,可能每一条都是非常重要的;如果一个worker在处理一个message过程中挂掉了,那么我们更希望这个message会被分发到下一个worker中,而不是因此丢失。
为了保证任何的message永远不会丢失,rabbitMQ支持message acknowledgments。一个Ack(nowledgement)信号将由consumer返回给rabbitMQ,告诉它该消息已被接收并且完成处理,rabbitMQ收到这个ack之后将会从queue中删除该message。
如果一个consumer挂掉(channel关闭,connection关闭,或者tcp连接断开)导致未发送Ack,那么rabbitMQ会知道这个消息未被完全处理并且会重新发送它,如果这是正巧有其他consumer在线,rabbitMQ会迅速的将这个未处理的message重新分发给其他consumer,这样就可以保证没有message会丢失。
Message acknowledgments默认是关闭的;打开ack消息确认机制需要将它设置为false(//auto-ack选项),此后调用msg.Ack(false),worker会发送正确的ack消息到rabbitMQ。
这样就能保证在worker正在处理message时,即使使用ctrl+c,message也不会被丢失。
注:忘记返回Ack是很常见的错误,但是后果确实很严重;当客户端退出时消息将会被重新发送(可能看起来像随机发送),但是rabbitMQ会消耗越来越多的内存,因为它无法释放unacked message。为了调试中这种错误,你可以使用rabbitmqctl打印message_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
windows下用:rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Message durability
我们已经知道如何保证消息在发送至consumer的处理过程不丢失,但是当rabbitMQ server挂掉时,消息还是会丢失。为了保证queue和message不会在RabbitMQ server挂掉时造成数据丢失,我们需要将queue和message持久化。
首先,确保queue是durable(持久化的)。
这样就保证了在RabbitMQ重启情况下queue不会丢失,现在需要将message标记为持久性的,在amqp.Publishing中使用amqp.Persistent选项
注:将消息标记为amqp.Persistent并不能解决消息绝不会被丢失的情况。它只是告诉RabbitMQ需要将该消息存储至硬盘,但是在存储过程中有一段存储时间,如果这段时间rabbitMQ Server发生故障,那么这个消息将会造成丢失,所以RabbitMQ不会为每个消息做fsync(2)——仅仅存入cache中,并不会真正写入硬盘。这个持久化策略可能并不够完美,但是它足以满足你的大多数需求。如果你需要足够完美的持久化策略以保证消息绝不会被丢失,可以参考publisher confirms。
Fair dispatch
你肯能注意到目前RabbitMQ的message分发策略(默认round-robin)并不是我们所期望的。比如,有两个worker,当所有的奇数号的message很重,偶数号的message很轻;在round-robin下分发消息时,一个worker会拿到所有奇数号的消息,另一个拿到所有偶数号的消息,这时一个worker就会非常忙碌而另一个则会闲着无所事事。因为RabbitMQ不知道这些事,任然不断的给workers分发消息。
发生这种情况是因为当一个消息进入queue时,RabbitMQ只负责将它分发到consumer,而不去查看这个consumer的unacknowledged messages的数量,而只是负责将第n个消息分发给第n(求余之后的n)个consumer。
为了解决这种情况我们需要将prefetch的值设置为1。告诉RabbitMQ在同一时间给一个worker分发的message不超过1个。换言之,在一个worker处理完一个message并发送ack之前,别再给他分发任何message。这样,这个消息就会被分发到下一个worker进行处理。
receive.go
注:如果所有的worker都很忙,此时queue可能被填充满,你需要注意的是增加更多的worker,或者其他的策略。