定时任务、Celery、消息队列

python定时任务有以下常见方案

注意gunicorn多worker时可能导致任务重复执行,可使用redis_lock等分布式锁、celery beat 、 gunicorn的preload=True 配置等解决

  • python-crontab 系列
    如python-crontab、django-crontab
    封装了Linux提供的crontab命令
    在Linux上需开启crontab,不支持windows,适用于中小型项目
  • apscheduler 系列
    如apscheduler、django-apscheduler、flask-apscheduler
    支持windows和linux,适用于中小型项目
  • Celery 系列
    如celery、django-celery、flask-celery
    支持windows和linux,支持分布式,配置较复杂,适用于大型项目
  • 自建轮子
import os, sys, time, datetime
import threading
import django
base_apth = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# print(base_apth)
# 将项目路径加入到系统path中,这样在导入模型等模块时就不会报模块找不到了
sys.path.append(base_apth)
os.environ['DJANGO_SETTINGS_MODULE'] ='base_django_api.settings' # 注意:base_django_api 是我的模块名,你在使用时需要跟换为你的模块
django.setup()
from base.models import ConfDict

def confdict_handle():
    while True:
        try:
            loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print('本地时间:'+str(loca_time))
            time.sleep(10)
        except Exception as e:
            print('发生错误,错误信息为:', e)
            continue


def main():
    '''
    主函数,用于启动所有定时任务,因为当前定时任务是手动实现,因此可以自由发挥
    '''
    try:
        # 启动定时任务,多个任务时,使用多线程
        task1 = threading.Thread(target=confdict_handle)
        task1.start()
    except Exception as e:
        print('发生异常:%s' % str(e))

if __name__ == '__main__':
    main()

django-crontab

安装及配置
  1. 安装
pip install django-crontab
  1. 注册
INSTALLED_APPS = (
    'django_crontab',
    ...
)
  1. settings.py 配置
    (时间配置, 'app名.文件名.函数名', 位置参数, 关键字参数, '>> 输出文件路径和名称')
  • 位置参数和关键字参数要么都填要么都不填
  • log需通过print输出
  • 配置的方法如不是view类型,django数据库指令不会自动提交,需配合with transaction.atomic()事务
CRONJOBS = [
    # 每隔5分钟运行一次
    ('*/5 * * * *', 'myapp.crontab.my_scheduled_job'),
    # 每隔6小时运行一次
    ('*/360 * * * *', 'weimob.views.refreshToken', ['James'], {}, '>> /tmp/django-crontab.log')
]

CRONTAB_COMMAND_PREFIX = 'LANG_ALL=zh_cn.UTF-8' # 解决 crontab 中文问题
  1. 常用命令
python manage.py crontab add # 添加所有django-crontab任务到系统crontab
python manage.py crontab show # 查看django-crontab添加过的任务
python manage.py crontab remove # 清除django-crontab任务
crontab -l # 查看所有系统crontab任务

函数内改动无需重新add,因每次调用都是一次独立行为(没有缓存)。
CRONJOBS改动后需要重新add,因每次add生成的hash校验会通不过。


Celery

Celery是一个任务队列管理工具,可用于实现异步接口、定期删除/缓存Redis数据、定期发送消息等。Celery本身不提供消息存储

  • Producer
    生产者,调用Celery的API产生任务并交给任务队列
  • Celery Beat
    任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
  • Brokers
    中间人,指任务队列,仅支持Redis、RabbitMQ。
  • Celery Workers
    消费者,从队列中取出任务并执行。可在多台服务器运行多个消费者提高效率
  • Result Stores / backend
    任务处理完后保存状态信息和结果,可以使用Redis、RabbitMQ或DjangoORM等。


    celery工作流程

消息队列

使用场景
  1. 解耦
    如在订单与库存系统中加入消息队列,使两个系统解耦
  2. 异步任务
    如发送短信、邮件、刷新缓存等
  3. 流量削峰
    如秒杀活动等高并发场景
broker选择
  • Redis
    其list适用于做轻量级的MQ存储,但功能和逻辑需上层应用自行实现
  • RabbitMQ
    使用生产-消费者模式,并引入了Echange(交换器)概念,根据调度策略将生产者的消息转发给符合的Queue,实现解耦

相比于redis的优势:

  1. 发布确认:生产者发布消息给Broker后,会收到Broker的反馈,保证发布成功
  2. 消费确认:消息提交给消费者后,如未成功消费确认,会返回到消息队列
  3. 高可用性:自带集群和负载均衡
  4. 持久化:redis只能将整个数据库持久化,而RabbitMQ可以对每条队列或消息分别设置持久化
基本使用DEMO
  1. 安装redis和celery
    如果celery>=4.0,需要确保redis>=2.10.4
apt-get install redis-server
pip install redis
pip install celery
  1. 建立task
#tasks.py
from celery import Celery
 
app = Celery('tasks',  backend='redis://:yourpassword@localhost:6379/0', broker='redis://:yourpassword@localhost:6379/0') #配置好celery的backend和broker
 
@app.task  #普通函数装饰为 celery task
def add(x, y):
    return x + y

也可以通过app.config_from_object() 加载配置模块:

app.config_from_object('celeryconfig')

# celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
  1. 启动worker(开始从队列中读取任务并执行)
celery -A tasks worker --loglevel=info

可通过celery worker --help查看命令列表,常用包括以下内容:

  • -A, --app
    指定使用的 Celery 实例,必须为module.path:attribute格式,如-A swallow.celery
  • -l, --loglevel [DEBUG(默认)|INFO|WARNING|ERROR|CRITICAL|FATAL]
    日志级别
  • -P, --pool [prefork(默认)|eventlet|gevent|solo|processes|threads]
    并发模式,其默认值prefork在windows上不支持,会报错not enough values to unpack (expected 3, got 0)
    可使用--pool=solo(单进程)或-P solo代替
    也可以使用geventeventlet,但需先用pip下载
  • -c, --concurrency
    同时处理任务的工作进程数量,超出的任务需等待其他任务完成后执行。默认为CPU数
  1. 触发任务
  • 通过delayapply_async直接触发
    当任务完成时result.ready() 为 True,然后用 result.get() 取结果即可。
    确保调用了 result.get()result.forget(),否则资源不会释放
#trigger.py
from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
while not result.ready():
    time.sleep(1)
print 'task done: {0}'.format(result.get())
  • 通过beat定时触发
    celery beat 是一个调度程序,会定期触发任务
    通过celery -A 【beat实例】 beat启动,如celery -A swallow.celery beat -l info
    此处的crontab模块并未调用系统的crontab,只是同名罢了
#settings.py
from celery.schedules import timedelta, crontab
# 默认使用UTC时区,建议改为`'Asia/Shanghai'`
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULE = {
    "task_every_30_minutes": {# 该key随便起,不需要调用。
        "task": "task_every_30_minutes",# 实际调用的是这个task
        "schedule": timedelta(minutes=30)
    },
    "task_every_day_start": {
        "task": "task_every_day_start",
        "schedule": crontab(minute=0, hour=0)
    },
    "task_every_even_hour": {
        "task": "task_every_even_hour",
        'schedule': crontab(minute=0, hour='0,2,4,6,8,10,12,14,16,18,20,22'),
    },
}

在django中使用celery
  • 独立使用celery
    需进行相关配置,使celery可以调用django中的内容,如django.setup()等。
  • 使用celery + djcelery
    配置更方便
内存泄漏问题
  1. 不要配合django的settings.DEBUG=True使用,
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容