介绍
APScheduler是基于Quartz的一个Python定时任务框架。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。官方参考 https://apscheduler.readthedocs.io/en/latest/userguide.html
安装
pip install apscheduler
如果由于某种原因 pip 不起作用,您可以从 PyPI 手动下载 APScheduler 分发包,解压缩然后安装它 https://pypi.org/project/APScheduler/
$ python setup.py install
使用方法
- 新建一个调度器schedulers
- 添加调度任务
- 运行调度任务
使用举例
特定的时间点触发
from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
print(text)
scheduler = BlockingScheduler()
# 在 2019-8-30 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1'])
# 在 2019-8-30 01:00:00 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2'])
# 在 2019-8-30 01:00:01 运行一次 job 方法
scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3'])
scheduler.start()
固定时间间隔触发
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('{} --- {}'.format(text, t))
scheduler = BlockingScheduler()
# 每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, args=['job1'])
# 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, seconds = 30, start_date='2019-08-29 22:15:00', end_date='2019-08-29 22:17:00', args=['job2'])
scheduler.start()
在特定时间周期性地触发
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('{} --- {}'.format(text, t))
scheduler = BlockingScheduler()
# 在每天22点,每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1'])
# 在每天22和23点的25分,运行一次 job 方法
scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])
scheduler.start()
通过装饰器scheduled_job()添加方法
添加任务的方法有两种:
- 通过调用add_job()---见上面1至3代码
- 通过装饰器scheduled_job():
第一种方法是最常用的方法。第二种方法主要是方便地声明在应用程序运行时不会更改的任务。该 add_job()方法返回一个apscheduler.job.Job实例,可以使用该实例稍后修改或删除该任务
import time
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('interval', seconds=5)
def job1():
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('job1 --- {}'.format(t))
@scheduler.scheduled_job('cron', second='*/7')
def job2():
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('job2 --- {}'.format(t))
scheduler.start()