Apscheduler简介
Apscheduler我就不过多赘述了。如果你还不太了解Apscheduler,请参考如下链接:
应用场景示例
话不多说,如下场景:
需要清理某个数据库的数据,保留半年数据,最好在周末自动执行。
使用Apscheduler则可以完美解决该问题。
使用mongodb管理定时任务
def get_scheduler():
"""mongodb存储job,返回任务调度器"""
scheduler = BackgroundScheduler()
client = MongoClient(host=configs.MONGO_HOST, port=configs.MONGO_PORT)
# collection为存储定时任务的文档(表),database为数据库
store = MongoDBJobStore(collection='scheduler_job', database='admin', client=client)
scheduler.add_jobstore(store)
# scheduler.add_job(job, 'interval', seconds=5, id=str(time.time()))
return scheduler
使用
def schedule_clear_db():
"""定时任务清除数据库"""
# 每周末晚上2点半删除过期数据(6个月之前的数据,这里的id对应mongodb中的_id
# day_of_week设置一周中的哪几天执行,使用前三位字母代替,args为参数,这里的6表示6个月,
# timezone最好设置,linux时区一般为utc,del_historical_data为定时任务实际执行的用来删除数据的函数
schedule = get_scheduler()
schedule.add_job(del_historical_data, 'cron', day_of_week='sat-sun', hour=2, minute=30, id="db_clear",
args=[6], replace_existing=True, timezone="Asia/Shanghai")
if schedule.state == 0:
schedule.start()
在mongo中查看任务
使用图形化工具查看定时任务,这里我使用的是mongodb compass
- 格式化数据库定时任务下次执行时间(next_run_time)
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(1600453800)))
# 2020-09-19 02:30:00
从结果定时任务启用成功,而且时间一致。
修改定时任务
- 实际中可能需要随时修改数据的保存时间,在实际web项目(django或者flask等)中加一个接口即可。
- 修改可以使用reschedule_job、modify_job、add_job方法。结合实际使用即可,这里使用add_job方法。
# add_job 如果id重复则会覆盖原来的任务
schedule.add_job(del_historical_data, 'cron', day_of_week='sat-sun', hour=2, minute=40, id="db_clear",
args=[save_months], replace_existing=True, timezone="Asia/Shanghai")
在flask中使用核心代码示例
- app.py
from datetime import timedelta
from configuration import con, schedule_clear_db
from configs import *
from flask_cors import CORS
from flask import Flask
app = Flask(__name__)
CORS(app, supports_credentials=True) # 设置全局跨域
app.register_blueprint(con, url_prefix='configuration') # 路由分发
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = timedelta(seconds=1) # 缓存时间为1秒
if __name__ == '__main__':
schedule_clear_db()
app.run(host='0.0.0.0', port="5000", debug=False, threaded=True)
- init.py
from flask import Blueprint
con = Blueprint("configuration", __name__)
from configuration.views import *
- views.py
import flask
from configuration import con
from configuration.impl import *
@con.route("/updateDbSaveTime", methods=["POST"])
def update_clear_db_time_inter():
"""修改清理过期数据时间设置,数据保留默认时间为6个月"""
data = json.loads(flask.request.get_data(as_text=True))
save_months = data.get("save_months")
return update_clear_db_time(save_months)
- 接口实现类impl.py
def update_clear_db_time(save_months):
"""定时任务清除数据库"""
# 每周末晚上2点半删除过期数据(6个月之前的数据)
# add_job 如果id重复则会覆盖原来的任务
schedule.add_job(del_historical_data, 'cron', day_of_week='sat-sun', hour=2, minute=40, id="db_clear",
args=[save_months], replace_existing=True, timezone="Asia/Shanghai")
if schedule.state == 0:
schedule.start()
conn = PyMongoDB()
find_one = conn.find_one_universal(col_name="scheduler_job", match={"_id": "db_clear"})
conn.close()
next_run_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(find_one.get("next_run_time")))
return {"code": "00", "msg": "success", "next_run_time": next_run_time}