项目地址:coleifer/huey
文档地址:huey, a little task queue
Why
其实这种需求很常见 。在跑仿真的时候,为了得到多组数据以供统计分析比对,我们需要将一些程序按照不同参数配置运行多次。为了利用现代计算机的多核性能,我们常常以多进程的方式来运行这些仿真程序以最大化利用硬件资源。
这是我开始使用huey的时候的需求,但是事实上huey的应用远不止于此。和celery类似,你可以把huey用来管理和调度异步任务,或者是用来运行周期性任务以及定时服务。
我最开始的方法是写了一个工具脚本,在这个脚本内部开了一个线程池,然后在将计算任务提交给这个线程池。但是这样有一个问题:如果我已经开始了一组计算任务,而我又希望再增加一组任务,我只能等前一组任务完全结束之后才能再开始下一组。不然前后两组任务会同时开始运行,争抢计算资源,而不是按照我设想的任务的队列的模式。
设想一个具体的例子。我的计算机是8核的,而我的一组计算任务需要以不同的参数运行16次。通过在进程内创建一个8线程容量的线程池,我可以让这16个任务进入队列序次执行,同时执行的现成数不超过8。倘若前一次的16个任务还剩余4个未运行,而我又要再增加16个计算任务,此时以这种模式,我无法将新增的任务放到前一个进程的任务队列里。如果直接强行运行,则会导致前一组任务的4个线程在运行,而后一个任务会调度出8个线程,总计12个线程,这并不能最优地使用多核。
那么这个时候我就需要一个任务队列系统来管理我的计算任务了:每次我要运行一组任务的时候,我将计算任务提交给任务队列系统,由这个系统来调度worker(worker的数量常常设定为CPU的线程数)执行。简而言之,任务队列系统提供了跨进程的队列。
其实之前我用过celery这个任务队列框架,可是celery是用起来“太重”,配置较为复杂。因此我后来选择了huey这个框架。这个框架的好处是:
- 配置起来非常简单
- 支持自动重试失败的任务
下面我来简要介绍一下这个框架的配置方法。当然更多的功能你可以前往其官网研究。
How
安装
pip install huey
配置
# worker.py
from huey import RedisHuey
huey = RedisHuey()
@huey.task()
def add_numbers(a, b):
return a + b
使用
RedisHuey
需要安装redis,这个请自行Google安装方法。如果觉得Redis还不够轻量级。可以选择Sqlite作为broker,具体方法参见huey的文档,我这里不做赘述。
没错,设置就是怎么简单。huey中任务的定义围绕着huey.task
这个装饰器来进行。如果相对任务做更多的功能设置,可以通过为huey.task
传入更多的参数实现。例如设置自动重试:
@huey.task(retries=3)
def add_numbers(a, b):
return a + b
此时任务如果遭遇失败,会尝试重试3次。如果重试之后仍然失败则会抛出异常。
启动Huey Worker
huey提供了huey_consumer.py
这个脚本来快捷启动worker。命令的基本使用方式如下:
huey_consumer.py worker.huey -k process -w 4
注意其中的worker.huey
,你需要在worker.py
的目录下执行这个脚本,而worker.huey
是你import这个对象需要的路径形式。其余常用的flag定义如下:
-w n worker的数量
-k process/thread/greenlet worker使用线程还是进程还是greenlet
-v log输出详细的debug信息
-0
使用Worker
在做好上述配置之后,将计算任务发送给worker执行的过程同普通的函数调用并没有什么分别。
from worker import huey, add_numbers
add_numbers(1, 2)
尽管看起来和普通的函数调用无异,但其实huey在背后将函数及其参数序列化后通过Redis这个broker,提交给了huey_consumer进程。这个进程在反向解析出函数和参数后,调度worker来执行这个函数。值得注意的是,这里的函数调用就变成了异步的,而在调用者侧函数调用的返回结果,则成为一个特殊打包对象,我们可以通过这个对象来获取任务执行的结果。
from worker import huey, add_numbers
res = add_numbers(1, 2)
add_result = res(blocking=True) # get 3
小结
以上是对huey这个任务队列工具的简单介绍,适合之前没有接触过任务队列工具的朋友入门的。事实上huey还为我们提供了更多的功能,包括周期性任务调度,延时调用,以及计算任务的暂停,取消和重新调度,consumer事件管理等等。关于这些内容我会后续更新文章介绍。
这里我给大家提供一个较为通用的用来执行一个Bash命令行命令的worker模板:
import subprocess
import os
from huey import RedisHuey
huey = RedisHuey()
@huey.task(retries=3)
def run_bash_task(cmd, std_out, working_dir):
print("Entring %s" % working_dir)
print("Running command %s, stdout to %s" % (cmd, std_out))
os.chdir(working_dir)
if std_out is None:
subprocess.check_call(cmd)
else:
with open(std_out, "w") as f:
subprocess.check_call(cmd, stdout=f)
print("finish task %s\n" % cmd)