rabbitmq中文教程python版 - 工作队列

源码:https://github.com/ltoddy/rabbitmq-tutorial

工作队列

image

(using the Pika Python client)

本章节教程重点介绍的内容

在第一篇教程中,我们编写了用于从命名队列发送和接收消息的程序。在这一个中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们安排稍后完成任务。我们将任务封装 为消息并将其发送到队列。
在后台运行的工作进程将弹出任务并最终执行作业。当你运行许多工人时,任务将在他们之间共享。

这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。

在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。
我们没有真实世界的任务,比如要调整大小的图像或要渲染的PDF文件,所以让我们假装我们很忙 - 使用 <code>time.sleep()</code> 函数来伪装它。
我们将把字符串中的点(".")数作为复杂度; 每一个点都会占用一秒的“工作”。例如,Hello ... 描述的假任务将需要三秒钟。

我们稍微修改前面例子中的send.py代码,以允许从命令行发送任意消息。这个程序将把任务安排到我们的工作队列中,所以让我们把它命名为new_task.py

import sys

message = ' '.join(sys.argv[1:]) or 'Hello World'

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print(" [x] Sent %r" % message)

我们的旧版receive.py脚本也需要进行一些更改:它需要为邮件正文中的每个点伪造第二个工作。它会从队列中弹出消息并执行任务,所以我们称之为worker.py

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

循环调度

使用任务队列的优点之一是可以轻松地平行工作。如果我们正在积累积压的工作,我们可以增加更多的工作人员,并且这种方式很容易扩展。

首先,我们试着同时运行两个worker.py脚本。他们都会从队列中获取消息,但具体到底是什么?让我们来看看。

您需要打开三个控制台。两个将运行worker.py脚本。这些控制台将成为我们的两个消费者 - C1和C2。

image

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。平均而言,每个消费者将获得相同数量的消息。这种分配消息的方式称为循环法。请尝试与三名或更多的工人。

消息确认

做任务可能需要几秒钟的时间。你可能想知道如果其中一个消费者开始一项长期任务并且只是部分完成而死亡会发生什么。
用我们目前的代码,一旦RabbitMQ将消息传递给客户,它立即将其标记为删除。在这种情况下,如果你杀了一个工人,我们将失去刚刚处理的信息。
我们也会失去所有派发给这个特定工作人员但尚未处理的消息。

但我们不想失去任何任务。如果一名工人死亡,我们希望将任务交付给另一名工人。

为了确保消息永不丢失,RabbitMQ支持消息确认。消费者发回ack(请求)告诉RabbitMQ已经收到,处理了特定的消息,并且RabbitMQ可以自由删除它。

如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失),RabbitMQ将理解消息未被完全处理,并将重新排队。如果有其他消费者同时在线,它会迅速将其重新发送给另一位消费者。
这样,即使工作人员偶尔死亡,也可以确保没有任何信息丢失。

没有任何消息超时; 当消费者死亡时,RabbitMQ将重新传递消息。即使处理消息需要非常很长的时间也没关系。

消息确认默认是被打开的。在前面的例子中,我们通过 no_ack = True 标志明确地将它们关闭。一旦我们完成了一项任务,现在是时候清除这个标志并且发送工人的正确确认。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)


channel.basic_consume(callback,
                      queue='hello')

使用这段代码,我们可以确定,即使在处理消息时使用CTRL + C来杀死一个工作者,也不会丢失任何东西。工人死后不久,所有未确认的消息将被重新发送。

消息持久性

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非您告诉它不要。需要做两件事来确保消息不会丢失:我们需要将队列和消息标记为持久。

首先,我们需要确保RabbitMQ永远不会失去我们的队列。为了做到这一点,我们需要宣布它是持久的:

channel.queue_declare(queue='hello', durable=True)

虽然这个命令本身是正确的,但它在我们的设置中不起作用。那是因为我们已经定义了一个名为hello的队列 ,这个队列并不"耐用"。
RabbitMQ不允许您使用不同的参数重新定义现有的队列,并会向任何试图执行该操作的程序返回错误。
但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如task_queue

channel.queue_declare(queue='task_queue', durable=True)

queue_declare更改需要应用于生产者和消费者代码。

此时我们确信,即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久 - 通过提供值为2的delivery_mode属性。

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 确保消息是持久的
                      ))

公平派遣

您可能已经注意到调度仍然无法完全按照我们的要求工作。例如,在有两名工人的情况下,当所有奇怪的信息都很重,甚至信息很少时,一名工作人员会一直很忙,
另一名工作人员几乎不会做任何工作。那么,RabbitMQ不知道任何有关这一点,并仍将均匀地发送消息。

发生这种情况是因为RabbitMQ只在消息进入队列时调度消息。它没有考虑消费者未确认消息的数量。它只是盲目地将第n条消息分发给第n位消费者。

image

为了解决这个问题,我们可以使用basic.qos方法和设置prefetch_count = 1。这告诉RabbitMQ一次不要向工作人员发送多个消息。
或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。相反,它会将其分派给不是仍然忙碌的下一个工作人员。

channel.basic_qos(prefetch_count=1)

把它放在一起

我们的new_task.py脚本的最终代码:

#!/usr/bin/env python
import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or 'Hello World'

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 确保消息是持久的
                      ))
print(" [x] Sent %r" % message)
connection.close()

而我们的工人 worker.py

#!/usr/bin/env python
import time
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback,
                      queue='hello')
channel.basic_qos(prefetch_count=1)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

使用消息确认和prefetch_count,您可以设置一个工作队列。即使RabbitMQ重新启动,持久性选项也可让任务继续存在。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,869评论 2 11
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 2,983评论 3 41
  • 赖玉超:做了个梦,万一实现了呢? 2016年2月18日惠州 睡着迷迷糊糊,电话响了,真是好梦被吵醒了,啥好梦呢? ...
    laiyuchao阅读 177评论 0 0