celery作为一个分布式异步任务队列管理工具,我们需要通过界面化的方式来进行管控任务的执行状态和查看任务执行结果,这里介绍的是flower后端管理页面的内容。
flower的启动
首先flower作为web页面来管理celery后台任务,和任务队列是隔离的,也就是flower的运行与否并不会影响到任务队列的真正执行,但是flower中可以通过API接口来管理celery中的任务执行。
在django工程目录下面(和manage.py文件在一个层级),具体的执行命令:
python manage.py celery flower --basic_auth=floweradmin:12345 --port=8083
由于本人在django工程中已经设置了 BROKER_URL
所以在上述的flower启动命令中就没有再次指定队列url。
flower界面介绍
本片文章着重进行flower界面的介绍,主要是因为没有一个完整的文档来进行介绍,所以下面是详细介绍flower界面中各个页面展示的内容,并且各个字段的含义。
Dashboard
Dashboard 页面是展示异步任务队列的主要情况,该页面包括如下几种状态的任务:
- Active: 表示worker从队列中获取到任务,且正在执行的任务数
- Processed: 表示worker从队列中获取到任务的总数据量
- Failed: 表示worker从队列中获取到任务,且执行失败了的(异常退出)
- Successed: 表示worker从队列中获取到任务,且执行成功了的
- Retried: 表示worker从队列中获取到任务,因为一些其他原因重新执行的数量
所以,上述这些数量的关系如下:
Processed = Active + Received + Failed + Successed + Retried
其中 Received 表示该任务分配到了worker中,但是还没有被处理的任务数量
Worker Name 表示的是执行celery任务的worker名称;
Status 表示的是该worker的状态,包括 Online (在线) 、 Offline(离线),重启flower进程,即可将Offline状态的worker剔除掉;
Active / Processed / Failed / Successed / Retried 分别表示该worker正在执行的任务数、该worker处理的总任务数、处理失败的任务数、处理成功的任务数、重试的任务数;
Load Average 表示系统在 1min / 5min / 15min 内的CPU平均负载(百分比)
Tasks
Tasks 页面是展示所有worker接收到的任务的处理情况。下面对该表格中的做一些介绍
- Name: 表示该任务的名称,默认规则为该函数的路径规则,例如
{模块名}.{文件名}.{函数名}
- UUID: 表示一个唯一字符串ID用于表示该任务
- State: 表示该任务的状态,包括: SUCCESS / FAILURE / STARTED / RECEIVED
- SUCCESS 表示该任务执行完毕且成功
- FAILURE 表示该任务执行失败
- STARTED 表示该任务正在执行
- RECEIVED 表示该任务在worker中,只是被接收而已
- args: 表示该任务的列表参数
- kwargs: 表示该任务的字典参数
- Result: 表示该任务函数的返回结果
- Received: 表示该任务在worker接收到的时间
- Started: 表示该任务在worker开始执行的时间
- Runtime: 表示该任务在worker真正执行的耗时(单位:秒)
- Worker: 表示该任务所在的worker名称
Broker
Broker 页面展示的是celery连接消息队列的信息,包括消息队列的访问URL,下面的截图展示的是链接的RabbitMQ,当然也可以链接Redis等。
- Name: 表示队列的名称
- Messages: 表示该队列的消息数量
- Unacked: 表示该队列中还没收到ack确认的消息数量(该消息可能处于 RECEIVED 或是 STARTED)
- Ready: 表示该队列中还未分配到worker的消息数量
- Consumers: 表示消费者数量(即worker数量)
- Idle since: 表示该队列空闲的最初时间,否则为 N/A
Monitor
Monitor 页面展示的是celery后台任务的曲线展示状况。
写在最后
上面介绍完了flower管理页面的使用,下面说一下,在实际使用过程中,由于队列阻塞,也就是消费者消费速率低于生产者的速率造成的情况,我们该如何处理。
这个时候我们为了快速解决这个问题,想先尽快清空队列该如何操作,重启worker是解决不了问题,并且worker重启之后,原先在worker中正在处理的任务,由于没有返回ack给到队列,这些任务将会重新放回队列中,那么重启worker之后,worker还是依然从队列中获取并重新执行。
像上述截图中,我们使用的是RabbitMQ作为消息队列,那么我们自然可以通过RabbitMQ的命令或者是RabbitMQ-management来操作,但是想像一下,这些后台都是放在平台上,我们无法直接访问操作,该如何去清理,这里提供一下本人通过pika来制作的一个清空队列的函数,并且可以作为HTTP请求来进行操作,可以设置最大清空队列中消息数量来进行处理,也可以全部清空。
具体的代码可以访问git代码。