定时任务、Celery、消息队列

python定时任务有以下常见方案

注意gunicorn多worker时可能导致任务重复执行,可使用redis_lock等分布式锁、celery beat 、 gunicorn的preload=True 配置等解决

  • python-crontab 系列
    如python-crontab、django-crontab
    封装了Linux提供的crontab命令
    在Linux上需开启crontab,不支持windows,适用于中小型项目
  • apscheduler 系列
    如apscheduler、django-apscheduler、flask-apscheduler
    支持windows和linux,适用于中小型项目
  • Celery 系列
    如celery、django-celery、flask-celery
    支持windows和linux,支持分布式,配置较复杂,适用于大型项目
  • 自建轮子
import os, sys, time, datetime
import threading
import django
base_apth = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# print(base_apth)
# 将项目路径加入到系统path中,这样在导入模型等模块时就不会报模块找不到了
sys.path.append(base_apth)
os.environ['DJANGO_SETTINGS_MODULE'] ='base_django_api.settings' # 注意:base_django_api 是我的模块名,你在使用时需要跟换为你的模块
django.setup()
from base.models import ConfDict

def confdict_handle():
    while True:
        try:
            loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print('本地时间:'+str(loca_time))
            time.sleep(10)
        except Exception as e:
            print('发生错误,错误信息为:', e)
            continue


def main():
    '''
    主函数,用于启动所有定时任务,因为当前定时任务是手动实现,因此可以自由发挥
    '''
    try:
        # 启动定时任务,多个任务时,使用多线程
        task1 = threading.Thread(target=confdict_handle)
        task1.start()
    except Exception as e:
        print('发生异常:%s' % str(e))

if __name__ == '__main__':
    main()

django-crontab

安装及配置
  1. 安装
pip install django-crontab
  1. 注册
INSTALLED_APPS = (
    'django_crontab',
    ...
)
  1. settings.py 配置
    (时间配置, 'app名.文件名.函数名', 位置参数, 关键字参数, '>> 输出文件路径和名称')
  • 位置参数和关键字参数要么都填要么都不填
  • log需通过print输出
  • 配置的方法如不是view类型,django数据库指令不会自动提交,需配合with transaction.atomic()事务
CRONJOBS = [
    # 每隔5分钟运行一次
    ('*/5 * * * *', 'myapp.crontab.my_scheduled_job'),
    # 每隔6小时运行一次
    ('*/360 * * * *', 'weimob.views.refreshToken', ['James'], {}, '>> /tmp/django-crontab.log')
]

CRONTAB_COMMAND_PREFIX = 'LANG_ALL=zh_cn.UTF-8' # 解决 crontab 中文问题
  1. 常用命令
python manage.py crontab add # 添加所有django-crontab任务到系统crontab
python manage.py crontab show # 查看django-crontab添加过的任务
python manage.py crontab remove # 清除django-crontab任务
crontab -l # 查看所有系统crontab任务

函数内改动无需重新add,因每次调用都是一次独立行为(没有缓存)。
CRONJOBS改动后需要重新add,因每次add生成的hash校验会通不过。


Celery

Celery是一个任务队列管理工具,可用于实现异步接口、定期删除/缓存Redis数据、定期发送消息等。Celery本身不提供消息存储

  • Producer
    生产者,调用Celery的API产生任务并交给任务队列
  • Celery Beat
    任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
  • Brokers
    中间人,指任务队列,仅支持Redis、RabbitMQ。
  • Celery Workers
    消费者,从队列中取出任务并执行。可在多台服务器运行多个消费者提高效率
  • Result Stores / backend
    任务处理完后保存状态信息和结果,可以使用Redis、RabbitMQ或DjangoORM等。


    celery工作流程

消息队列

使用场景
  1. 解耦
    如在订单与库存系统中加入消息队列,使两个系统解耦
  2. 异步任务
    如发送短信、邮件、刷新缓存等
  3. 流量削峰
    如秒杀活动等高并发场景
broker选择
  • Redis
    其list适用于做轻量级的MQ存储,但功能和逻辑需上层应用自行实现
  • RabbitMQ
    使用生产-消费者模式,并引入了Echange(交换器)概念,根据调度策略将生产者的消息转发给符合的Queue,实现解耦

相比于redis的优势:

  1. 发布确认:生产者发布消息给Broker后,会收到Broker的反馈,保证发布成功
  2. 消费确认:消息提交给消费者后,如未成功消费确认,会返回到消息队列
  3. 高可用性:自带集群和负载均衡
  4. 持久化:redis只能将整个数据库持久化,而RabbitMQ可以对每条队列或消息分别设置持久化
基本使用DEMO
  1. 安装redis和celery
    如果celery>=4.0,需要确保redis>=2.10.4
apt-get install redis-server
pip install redis
pip install celery
  1. 建立task
#tasks.py
from celery import Celery
 
app = Celery('tasks',  backend='redis://:yourpassword@localhost:6379/0', broker='redis://:yourpassword@localhost:6379/0') #配置好celery的backend和broker
 
@app.task  #普通函数装饰为 celery task
def add(x, y):
    return x + y

也可以通过app.config_from_object() 加载配置模块:

app.config_from_object('celeryconfig')

# celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
  1. 启动worker(开始从队列中读取任务并执行)
celery -A tasks worker --loglevel=info

可通过celery worker --help查看命令列表,常用包括以下内容:

  • -A, --app
    指定使用的 Celery 实例,必须为module.path:attribute格式,如-A swallow.celery
  • -l, --loglevel [DEBUG(默认)|INFO|WARNING|ERROR|CRITICAL|FATAL]
    日志级别
  • -P, --pool [prefork(默认)|eventlet|gevent|solo|processes|threads]
    并发模式,其默认值prefork在windows上不支持,会报错not enough values to unpack (expected 3, got 0)
    可使用--pool=solo(单进程)或-P solo代替
    也可以使用geventeventlet,但需先用pip下载
  • -c, --concurrency
    同时处理任务的工作进程数量,超出的任务需等待其他任务完成后执行。默认为CPU数
  1. 触发任务
  • 通过delayapply_async直接触发
    当任务完成时result.ready() 为 True,然后用 result.get() 取结果即可。
    确保调用了 result.get()result.forget(),否则资源不会释放
#trigger.py
from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
while not result.ready():
    time.sleep(1)
print 'task done: {0}'.format(result.get())
  • 通过beat定时触发
    celery beat 是一个调度程序,会定期触发任务
    通过celery -A 【beat实例】 beat启动,如celery -A swallow.celery beat -l info
    此处的crontab模块并未调用系统的crontab,只是同名罢了
#settings.py
from celery.schedules import timedelta, crontab
# 默认使用UTC时区,建议改为`'Asia/Shanghai'`
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULE = {
    "task_every_30_minutes": {# 该key随便起,不需要调用。
        "task": "task_every_30_minutes",# 实际调用的是这个task
        "schedule": timedelta(minutes=30)
    },
    "task_every_day_start": {
        "task": "task_every_day_start",
        "schedule": crontab(minute=0, hour=0)
    },
    "task_every_even_hour": {
        "task": "task_every_even_hour",
        'schedule': crontab(minute=0, hour='0,2,4,6,8,10,12,14,16,18,20,22'),
    },
}

在django中使用celery
  • 独立使用celery
    需进行相关配置,使celery可以调用django中的内容,如django.setup()等。
  • 使用celery + djcelery
    配置更方便
内存泄漏问题
  1. 不要配合django的settings.DEBUG=True使用,
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容