RabbitMQ -- part2 [ Work Queues]

上节介绍了在命名队列中发送和获取消息,本节介绍创建一个工作队列,然后分发任务到多个消费者(consumer)

work queues
  • 编辑生产者 new_task.py

#!/usr/bin/env python3
# coding=utf-8

import pika
import sys

# 获取终端输入的信息,上传到队列
message = ' '.join(sys.argv[1:]) or 'Hello World!'

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body=message)

print("[x] Sent '%s'" % message)

connection.close()
  • 编辑消费者 worker.py

#!/usr/bin/env python3
# coding=utf-8

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print('[x] Received %r' % body)
    # 通过统计body中点符号的数量来调用time.sleep(),用来模拟真实环境中处理任务的耗时
    time.sleep(body.count(b'.'))    
    print('[x] Done')

channel.basic_consume(callback, queue='hello', no_ack=True)

print('[*] Waiting for messgaes. To exit press CTRL+C')

channel.start_consuming()
  • 轮询调度规则 (Round-robin)

执行以上代码,同时启动一个生产者(new_task.py)和两个消费者(worker.py),可以发现RabbitMQ通过轮询调度的方式将消息分发给消费者。

# 生产者
python3 new_task.py This is a test.
python3 new_task.py This is a test..
python3 new_task.py This is a test...
python3 new_task.py This is a test....
python3 new_task.py This is a test.....
# 消费者1
> python3 worker.py
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'This is a test.'
[x] Done
[x] Received b'This is a test...'
[x] Done
[x] Received b'This is a test.....'
[x] Done
# 消费者2
> python3 worker.py
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'This is a test..'
[x] Done
[x] Received b'This is a test....'
[x] Done

优点:使用 Task Queue 可以轻易的实现平行计算。当工作队列积压时,只需要添加更多的consumer来解决。

  • 消息确认

当消息被consumer取走之后,RabbitMQ会将此消息标记为deletion。这种情况下,如果consumer挂掉,消息没有处理成功,就等于丢失了。但这是不可取的,为确保消息不丢失,RabbitMQ支持consumer在处理完消息之后返回确认,告诉RabbitMQ,此消息已被处理,并且可以删除。
当一个consumer挂掉(它的channel关闭、连接断开或者TCP链接断开),没有发送ack,RabbitMQ会认为此消息没有被处理成功并且会再次放入队列。如果此时有其他consumer在线,则会分发此消息到其他consumer。这样就会避免消息丢失。

Manual message acknowledgments 默认是打开的。之前的代码中使用了 no_ack=True 来关闭ACK。移除此配置,当完成任务之后将会发送ack确认。

#!/usr/bin/env python3
# coding=utf-8

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print('[x] Received %r' % body)
    time.sleep(body.count(b'.'))
    print('[x] Done')
    ch.basic_ack(delivery_tag = method.delivery_tag)    # 任务处理完之后,发送确认

channel.basic_consume(callback, queue='hello')      # 去除 no_ack

print('[*] Waiting for messgaes. To exit press CTRL+C')

channel.start_consuming()

一个常犯的错误是忘记了basic_ack,非常严重,因为无法释放掉未确认的消息,所以消息将被重复投递到客户端,会消耗越来越多的内存。可以通过rabbitmqctl工具调试,显示出为确认的消息:rabbitmqctl list_queues name messages_ready messages_unacknowledged

  • 消息持久化

之前的程序,任务在服务停止之后会丢失。如果RabbitMQ quit或crash,队列和消息将被丢失。要想确保消息不丢失,需要同时标记队列和消息为持久化

  1. 首先确保队列不会丢失:

channel.queue_declare(queue='task_queue', durable=True) # 添加 durable 参数

注意: 之前已经申明了"hello"队列,不能使用不同的参数进行再次申明,否则会报错。可以考虑换一个队列名,例如:task_queue

queue_declare 需要在producer和consumer两端一起修改

  1. 然后标记消息为持久化,添加 delivery_mode 参数,值为2
#!/usr/bin/env python3
# coding=utf-8

import pika
import sys

message = ' '.join(sys.argv[1:]) or 'Hello World!'

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

 # 在producer和cosumer两端一起添加 durable=True,确保队列不丢失
channel.queue_declare(queue='task_queue', durable=True)    

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode = 2,    # 使消息持久化
                      ))

print("[x] Sent '%s'" % message)

connection.close()

注: 这种办法不能完全确保消息不丢失,在RabbitMQ接受消息到保存到磁盘的这个时间窗口中,如果发生意外,消息可能还在缓存中并没有将消息保存到disk中。这种方法不算很健壮,但是可以满足绝大多数简单的任务队列。如果需要完全的不丢失,可以使用 [ publisher confirms ]

  • 均匀的分发

在consumer中添加: channel.basic_qos(prefetch_count=1) ,告诉RabbitMQ不将多个消息分发给同一个consumer。如果处理较繁杂,则队列会堵塞,可以通过添加多个consumer或者使用 [ message TTL ]


参考文档:http://www.rabbitmq.com/tutorials/tutorial-two-python.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,422评论 2 34
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,374评论 0 1
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,082评论 19 139
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 3,053评论 3 41
  • 应用场景 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种: 1.串行的方式 2.并行的...
    lijun_m阅读 1,871评论 0 3