python使用Apscheduler定时清理数据

Apscheduler简介

Apscheduler我就不过多赘述了。如果你还不太了解Apscheduler,请参考如下链接:

应用场景示例

话不多说,如下场景:

需要清理某个数据库的数据,保留半年数据,最好在周末自动执行。

使用Apscheduler则可以完美解决该问题。

使用mongodb管理定时任务

def get_scheduler():
    """mongodb存储job,返回任务调度器"""
    scheduler = BackgroundScheduler()
    client = MongoClient(host=configs.MONGO_HOST, port=configs.MONGO_PORT)
    # collection为存储定时任务的文档(表),database为数据库
    store = MongoDBJobStore(collection='scheduler_job', database='admin', client=client)
    scheduler.add_jobstore(store)
    # scheduler.add_job(job, 'interval', seconds=5, id=str(time.time()))
    return scheduler

使用

def schedule_clear_db():
    """定时任务清除数据库"""
    # 每周末晚上2点半删除过期数据(6个月之前的数据,这里的id对应mongodb中的_id
    # day_of_week设置一周中的哪几天执行,使用前三位字母代替,args为参数,这里的6表示6个月,
    # timezone最好设置,linux时区一般为utc,del_historical_data为定时任务实际执行的用来删除数据的函数
    schedule = get_scheduler()
    schedule.add_job(del_historical_data, 'cron', day_of_week='sat-sun', hour=2, minute=30, id="db_clear",
                     args=[6], replace_existing=True, timezone="Asia/Shanghai")
    if schedule.state == 0:
        schedule.start()

在mongo中查看任务

使用图形化工具查看定时任务,这里我使用的是mongodb compass


定时任务信息
  • 格式化数据库定时任务下次执行时间(next_run_time)
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(1600453800)))
# 2020-09-19 02:30:00

从结果定时任务启用成功,而且时间一致。

修改定时任务

  • 实际中可能需要随时修改数据的保存时间,在实际web项目(django或者flask等)中加一个接口即可。
  • 修改可以使用reschedule_job、modify_job、add_job方法。结合实际使用即可,这里使用add_job方法。
# add_job 如果id重复则会覆盖原来的任务
schedule.add_job(del_historical_data, 'cron', day_of_week='sat-sun', hour=2, minute=40, id="db_clear",
                     args=[save_months], replace_existing=True, timezone="Asia/Shanghai")

在flask中使用核心代码示例

  • app.py
from datetime import timedelta

from configuration import con, schedule_clear_db
from configs import *
from flask_cors import CORS
from flask import Flask

app = Flask(__name__)
CORS(app, supports_credentials=True)  # 设置全局跨域
app.register_blueprint(con, url_prefix='configuration')  # 路由分发
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = timedelta(seconds=1)  # 缓存时间为1秒

if __name__ == '__main__':
    schedule_clear_db()
    app.run(host='0.0.0.0', port="5000", debug=False, threaded=True)
  • init.py
from flask import Blueprint

con = Blueprint("configuration", __name__)

from configuration.views import *
  • views.py
import flask
from configuration import con
from configuration.impl import *


@con.route("/updateDbSaveTime", methods=["POST"])
def update_clear_db_time_inter():
    """修改清理过期数据时间设置,数据保留默认时间为6个月"""
    data = json.loads(flask.request.get_data(as_text=True))
    save_months = data.get("save_months")
    return update_clear_db_time(save_months)
  • 接口实现类impl.py
def update_clear_db_time(save_months):
    """定时任务清除数据库"""
    # 每周末晚上2点半删除过期数据(6个月之前的数据)
    # add_job 如果id重复则会覆盖原来的任务
    schedule.add_job(del_historical_data, 'cron', day_of_week='sat-sun', hour=2, minute=40, id="db_clear",
                     args=[save_months], replace_existing=True, timezone="Asia/Shanghai")
    if schedule.state == 0:
        schedule.start()
    conn = PyMongoDB()
    find_one = conn.find_one_universal(col_name="scheduler_job", match={"_id": "db_clear"})
    conn.close()
    next_run_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(find_one.get("next_run_time")))
    return {"code": "00", "msg": "success", "next_run_time": next_run_time}

接口测试

测试结果
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。