介绍
所有trigger继承自BaseTrigger,IntervalTrigger是其中最为典型的一种。
主要方法 get_next_fire_time,
if 前一次点火时间:
下一次点火时间 = 前一次点火时间 + 间隔
elif 开始日期(第一次点火时间) > now:
下一次点火时间 = 开始日期
else:
延误N次点火
下一次点火时间 = 第N+1次点火时间
点火时间抖动
源代码
from math import ceil
from tzlocal import get_localzone
from apscheduler.triggers.base import BaseTrigger
from apscheduler.util import convert_to_datetime, timedelta_seconds, datetime_repr, astimezone
class IntervalTrigger(BaseTrigger):
"""
Triggers on specified intervals, starting on ``start_date`` if specified, ``datetime.now()`` +
interval otherwise.
:param int weeks: number of weeks to wait
:param int days: number of days to wait
:param int hours: number of hours to wait
:param int minutes: number of minutes to wait
:param int seconds: number of seconds to wait
:param datetime|str start_date: starting point for the interval calculation
:param datetime|str end_date: latest possible date/time to trigger on
:param datetime.tzinfo|str timezone: time zone to use for the date/time calculations
:param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
"""
__slots__ = 'timezone', 'start_date', 'end_date', 'interval', 'interval_length', 'jitter'
def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None,
end_date=None, timezone=None, jitter=None):
self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes,
seconds=seconds) # 间隔时间
self.interval_length = timedelta_seconds(self.interval) # 间隔秒
if self.interval_length == 0:
self.interval = timedelta(seconds=1)
self.interval_length = 1
if timezone:
self.timezone = astimezone(timezone)
elif isinstance(start_date, datetime) and start_date.tzinfo:
self.timezone = start_date.tzinfo
elif isinstance(end_date, datetime) and end_date.tzinfo:
self.timezone = end_date.tzinfo
else:
self.timezone = get_localzone()
start_date = start_date or (datetime.now(self.timezone) + self.interval)
self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
self.jitter = jitter
def get_next_fire_time(self, previous_fire_time, now):
if previous_fire_time:
next_fire_time = previous_fire_time + self.interval
elif self.start_date > now:
next_fire_time = self.start_date
else:
# 延误
# 延误时间 D = T0- Ts, 延误次数 N = [D/I], 准备第N+1次点火T1 = Ts + I * N
timediff_seconds = timedelta_seconds(now - self.start_date)
next_interval_num = int(ceil(timediff_seconds / self.interval_length))
next_fire_time = self.start_date + self.interval * next_interval_num
if self.jitter is not None:
# 点火时间抖动
next_fire_time = self._apply_jitter(next_fire_time, self.jitter, now)
if not self.end_date or next_fire_time <= self.end_date:
return self.timezone.normalize(next_fire_time)
def __getstate__(self):
return {
'version': 2,
'timezone': self.timezone,
'start_date': self.start_date,
'end_date': self.end_date,
'interval': self.interval,
'jitter': self.jitter,
}
def __setstate__(self, state):
# This is for compatibility with APScheduler 3.0.x
if isinstance(state, tuple):
state = state[1]
if 'version' in state and state['version']> 2:
raise ValueError(
'Got serialized data for version %s of %s, but only versions up to 2 can be '
'handled' % (state['version'], self.__class__.__name__))
self.timezone = state['timezone']
self.start_date = state['start_date']
self.end_date = state['end_date']
self.interval = state['interval']
self.interval_length = timedelta_seconds(self.interval)
self.jitter = state.get('jitter')
def __str__(self):
return 'interval[%s]' % str(self.interval)
def __repr__(self):
options = ['interval=%r' % self.interval, 'start_date=%r' % datetime_repr(self.start_date)]
if self.end_date:
options.append("end_date=%r" % datetime_repr(self.end_date))
if self.jitter:
options.append('jitter=%s' % self.jitter)
return "<%s (%s, timezone='%s')>" % (
self.__class__.__name__, ', '.join(options), self.timezone)