** 背景 **
后台开发过程中,一个请求比较耗时,大致过程如下:
- 根据前端传入的两个img_url去第三方下载图片。
- 将获取到的图片进行适当缩放,并解析成base64编码。
- 再将解析好的base64编码的图片拿去请求第三方。
** 处理过程 **
明显的,以上三步均比较耗时。
所以,将其处理为异步是必然的。况且用的是tornado这个单进程框架,不异步岂不是作死。
因为请求会相对比较频繁,单纯的异步还不能完美解决。这个时候,初出茅庐的我当然是想到了Celery(只想到了它)。
具体实施为:
- 当接收到请求时,项目对参数作详细检查,然后发送一个task任务到Celery的BROKER中,然后立即返回“请求成功”的结果至前端(结果仅仅表示前端的请求,后端接收成功,并开始处理)。
- 事先启动好的worker进程(可放在其他服务器上)从BROKER中读取task任务,并执行。
- Celery的worker执行完任务后,将得到的数据写入数据库。
这样处理的好处是:
- 实现了异步,用户不用“耐心”的等待。
- 分布式任务处理,解决的单机的cpu处理能力不足的问题。
这样处理的缺点是:
- 用户不能立马得到结果。
- 需要额外再次请求另一个查询接口才能获取数据。
** 解决的问题 **
在执行task的时候,task中需要读写数据库,但celery的worker是额外的进程,并不能继续使用项目中的数据库对象(用的pony orm)来读写数据库。所以,导致了调试过程中出现MySQL server has gone away或其他类似的数据库连接或映射的错误。
** 解决办法 **
既然是因为数据库连接问题,那么简单,直接再连接一次就行了。刚开始的时候,懵逼般的在每次任务的时候去连接一次。后来在后续的代码阅读的过程中发现这样处理有些不妥啊。任务很多的话岂不是会频繁的去连接?后来发现,在启动worker的命令中,需要指定一个task任务文件(就是被celery的task装饰器修饰的任务函数),所以,可以在task所在文件的最外层作一个数据库的连接。这样的话,最终实现的就是每一个worker均有一个自己的数据库连接对象,就像这样的task文件:
# coding=utf-8
from celery import Celery, platforms
from configs.config import BROKER, DATABASE
from apps.ylzh.ylzh import PylzhProcess
from apps.zs.handler import FaceOneToOneProcess
from apps.commons.database import PonyDatabase
# 加try是因为项目(不是worker)在执行时,已经有了一个PonyDatabase
(这是一个我处理过的pony的Database的单例类)数据库操作对象,
而项目因为要发送task,所以会引入这个类,会导致db再次bind而发生异常
所以,用了try
try:
db = PonyDatabase()
db.bind('mysql', host=DATABASE['host'], db=DATABASE['db_name'], user=DATABASE['username'], passwd=DATABASE['password'] )
db.generate_mapping()
except Exception:
pass
app = Celery("celery_model", broker=BROKER) platforms.C_FORCE_ROOT = True
app.conf.update(CELERY_DEFAULT_QUEUE='default_queue')
@app.task()
def ylzh(user_data):
PylzhProcess().request_data(user_data)
...