Celery结合bottle的实践笔记

Celery.png

问:Celery 是什么?

答:Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。
Celery 专注于实时任务处理,支持任务调度。(来源于网络)

问:适用场景在哪里?

答:如图示(来自:http://blog.csdn.net/xsj_blog/article/details/70181984

image.png

问:生产者和消费者模式定义是什么?

答:
(1)生产者->负责产生数据;
(2)消费者->负责数据处理;
(3)缓冲区->解耦生产者和消费者,减少依赖,主要是通过消息队列来进行两点之间的通讯处理。

图示:


image.png

问:什么是任务队列?

答:任务队列是一种在线程或机器间分发任务的机制。

问:什么是消息队列?

答:消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。

问:职程有什么作用?

答:Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程,职程对消息进行处理。如下图所示:


image.png

问:Celery的架构三部分是哪几个部分?

答:Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

(1)消息中间件
PS: Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括,RabbitMQ,Redis,MongoDB等。

(2)任务执行单元
PS: Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

(3)任务结果存储
PS: Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,Django ORM,AMQP等。

(任务调度)Celery Beat:任务调度
Celery Beat:任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。

来自网络

消息分发与任务调度的实现机制(来自:http://blog.csdn.net/xsj_blog/article/details/70181984

image.png

1:—>producer发出调用请求(message包含所调用任务的相关信息)
2:—>celery服务启动时,会产生一个或多个交换机(exchanges),对应的交换机 接收请求message
3:—>交换机根据message内容,将message分发到一个或多个符合条件的队列(queue)
4:—>每个队列上都有一个或多个worker在监听,在监听到符合条件的message到达后,worker负责进行任务处理,任务处理完被确认后,队列中的message将被删除。

注释:Exchange和Queue都是Rabbitmq中的概念

Exchange:交换机,决定了消息路由规则;

Queue:消息队列;

Channel:进行消息读写的通道;

Bind:绑定了Queue和Exchange,意即为符合什么样路由规则的消息,将会放置入哪一个[消息队列];

调图流程图示:(来自https://www.cnblogs.com/forward-wang/p/5970806.html

image.png

实践步骤:

相关依赖:
image.png
第1步:首先搭建bottle客户端端,进行任务委派:

main.py

#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/30 15:58
@version: v1.0.0
@File: main.py
@文件功能描述:
"""

from bottle import route, run

@route('/')
def index():
    return '访问了首页!'

run(host='127.0.0.1', port=8080, debug=True, reloader=True)

启动wen服务应用访问:


image.png
第2步:编写对应Celery任务模块celery_test
image.png
第3步:编写对应Celery任务模块启动配置文件

setting.py配置(使用配置的方式来启动相关worker来处理 任务):

# coding:utf-8
from datetime import timedelta
from kombu import Exchange, Queue

# 配置消息中间件Broker
BROKER_URL = 'redis://127.0.0.1:6379/0'  

# 配置结果存贮Backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'

# 指定时区,默认是 UTC
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# # 不需要返回任务状态,即设置以下参数为True
# 如果不需要某个任务的结果,应该确保Celery不去获取这些结果。这是通过装饰器@task(ignore_result=True)来做的。如果所有的任务结果都忽略了,就不必定义结果后台。这可以让性能大幅提高。
CELERY_IGNORE_RESULT = True

# 任务序列化和反序列化使用msgpack方案
CELERY_TASK_SERIALIZER = 'json'  

# 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_RESULT_SERIALIZER = 'json'  

# CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显

# celery worker的并发数 也是命令行-c指定的数目,事实上实践发现并不是worker也多越好,保证任务不堆积,加上一定新增任务的预留就可以
CELERYD_CONCURRENCY = 10 

# celery worker 每次去rabbitmq取任务的数量,我这里预取了4个慢慢执行,因为任务有长有短没有预取太多
CELERYD_PREFETCH_MULTIPLIER = 4 

CELERY_ACCEPT_CONTENT = ['json']  # 指定接受的内容类型

# 默认的队列,如果一个消息不符合其他的队列就会放在默认队列里面
CELERY_DEFAULT_QUEUE = "default" 

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('for_add', Exchange('for_task_add'), routing_key='for_task_add'),
    Queue('for_send_email', Exchange('for_task_email'), routing_key='for_task_email'),
)

CELERY_ROUTES = {
    'celery_test.tasks.add': {'queue': 'for_add', 'routing_key': 'for_task_add'},
    'celery_test.tasks.send_mail': {'queue': 'for_send_email', 'routing_key': 'for_task_email'},
}

CELERYBEAT_SCHEDULE = {
    'send_mail': {
        'task': 'celery_test.tasks.send_mail',
        'schedule': timedelta(seconds=30),
    },
    'add': {
        'task': 'celery_test.tasks.add',
        'schedule': timedelta(seconds=10),
        'args': (16, 16)

    }
}

PS:以下代码就解释:

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),# 这是上面指定的默认队列 
    Queue('for_add', Exchange('for_task_add'), routing_key='for_task_add'), # 这是一个for_add队列 凡是for_task_add开头的routing key都会被放到这个队列
    Queue('for_send_email', Exchange('for_task_email'), routing_key='for_task_email'),
# 这是一个or_send_email'队列 凡是for_task_email开头的routing key都会被放到这个队列
)
第4步:编写对应Celery实例

server.py

from  celery import Celery
app=Celery('celery_test',include=['celery_test.tasks'])
app.config_from_object('celery_test.setting')

if __name__=='__main__':
    app.start()
第5步:编写对应任务

tasks.py

# coding:utf-8
from celery_test.server import app

@app.task(bind=True)
def add(self,x, y):
    return x + y


@app.task(bind=True)
def send_mail(self,x, y):
    return x - y
第6步:修改main.py进行任务调用

main.py

from bottle import route, run, redirect

from celery_test import tasks


# @route('/add')
# def index():
#     tasks.add.daley(888, 45)
#     return '访问了add!'


@route('/send_mail')
def index():
    task = tasks.send_mail.delay(888, 45)
    print('访问了send_mail!')
    return redirect('/tasks_status/' + task.id)  # 重定向到首页(可以 )


@route('/tasks_status/<task_id>')
def index(task_id):
    # 获取异步任务结果
    task = tasks.send_mail.AsyncResult(task_id)
    # 等待处理
    if task.state == 'PENDING':
        response = {'state': task.state, 'current': 0, 'total': 1}
        print('PENDING:', response)
    elif task.state != 'FAILURE':
        response = {'state': task.state, 'current': task.info.get('current', 0), 'total': task.info.get('total', 1)}
        # 处理完成
        if 'result' in task.info:
            response['result'] = task.info['result']
        print('处理完成:', response)
    else:
        # 后台任务出错
        response = {'state': task.state, 'current': 1, 'total': 1}
        print('后台任务出错:', response)


run(host='127.0.0.1', port=8080, debug=True, reloader=True)

第7步:启动指定的队列
 celery -A celery_test.server worker -l info -Q for_send_email
image.png

启动成功如图示:


image.png
第8步:启动web服务调用对应的URL请求异步处理异步任务
调用:http://127.0.0.1:8080/send_mail
image.png

即时查看任务处理情况:

http://127.0.0.1:8080/tasks_status/0079d834-d918-4ad7-88dd-f23c5eeb09dc

查看对应的celery的运行 情况:

image.png

问:监控Celery任务执行情况?

答:Flower是基于web的监控和管理Celery的工具.
相关文档:
http://flower-docs-cn.readthedocs.io/zh/latest/
安装pip install flower
启动flower(flower默认的端口是5555.)
celery flower --port=5555 --broker=redis://localhost:6379/0
celery flower --broker=amqp://guest:guest@192.168.xx.xxx:5672//
启动任务查看
celery flower --port=5555 --broker=redis://localhost:6379/0

image.png

访问:127.0.0.1:5555


image.png

进行任务执行:http://127.0.0.1:8080/send_mail
查看任务执行结果

image.png

PS其他命令

============================================================================
前台启动

启动指定的队列
 celery -A celery_test.server worker -l info -Q for_send_email

 celery -A celery_test.server worker -l info -Q for_add

启动定时相关的任务队列
 celery -A celery_test.server beat

 celery -A celery_test.server worker -l info -Q for_send_email

 celery -A celery_test.server worker -l info -Q for_add




============================================================================
后台启动
celery multi start w1 -A proj -l info
celery  multi restart w1 -A proj -l info

# 异步关闭 立即返回
celery multi stop w1 -A proj -l info
# 等待关闭操作完成
celery multi stopwait w1 -A proj -l info

调用任务:
add.apply_async((2, 2), queue='lopri', countdown=10)
# 指定要发送到哪个队列 运行时间延迟countdown
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容