APScheduler管理及监控平台

背景

APScheduler是一个非常好用的调度平台,不过目前所有Scheduler的JOB信息都无法通过可视化的方式展示,只能通过后台日志来查看调度信息,对于管理上非常不便。

但是APScheduler非常的强大,已经预留的event功能可以帮助来实现此功能,对于APScheduler原理还不太理解的话,可以参考之前的一篇文章Python定时库APScheduler原理及用法

在使用Flask进行管理后,通过Flask-APScheduler插件来实现对APScheduler的管理以及动态增删JOB的接口实现,以此完成对APScheduler的全方位管理。

目的

本文的目的主要有两部分功能块,第一部分是利用APScheduler的event机制来实现以下两个功能并进行可视化查看

  • 将APScheduler中所有添加的JOB进行状态跟踪
  • APScheduler中每个JOB的生命周期进行跟踪

第二部分是在Flask框架上构建的管理平台上集成Flask-APScheduler插件,完成对APScheduler的管理以及动态增删JOB的接口实现。

实现

集成Flask-APScheduler插件完成APScheduler的动态管理

将APScheduler集成到Flask中

config_name = os.getenv('FLASK_CONFIG') or 'default'
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
# 初始化Sqlarchemy
db.app = app
db.init_app(app)
# 初始化 flask_apscheduler,将scheduler嵌入到flask管理,本地在flask_apscheduler插件中增加add_listener监听所有的job生命周期
flask_apscheduler = CustomAPScheduler(db.session, app=app)
# 启动apscheduler
flask_apscheduler.start()

配置Flask-APScheduler开启对外接口

class Config:
    # apscheduler默认的jobstore
    SCHEDULER_JOBSTORES = {}
    # flask_apscheduler是否对外提供接口
    SCHEDULER_API_ENABLED = True

Flask-APScheduler提供的api如下

def _load_api(self):
    """
    Add the routes for the scheduler API.
    """
    self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
    self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
    self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
    self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
    self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
    self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
    self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
    self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
    self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')

启动后,通过提供的接口进行动态管理

直接动态调用接口添加, 具体的参数需要到apscheduler的源码进行查看

添加JOB举例说明(add_job)

请求添加接口:http://127.0.0.1:5000/scheduler/jobs
请求方法:POST
请求header:
{
    "Content-Type": "application/json"
}
请求body:
{
    "id": "test_add_job",
    "name":"管理平台添加job测试",
    "func": "app:jobs.test.test_job", # 这里就是模块:函数,本地定义的方法保证可以import
    "trigger": "date" # 触发器为指定时间,这里时间没有指定,就是立马执行
}
返回结果:
{
    "id": "test_add_job",
    "name": "管理平台添加job测试",
    "func": "app:jobs.test.test_job",
    "args": [],
    "kwargs": {},
    "trigger": "date",
    "run_date": "2021-03-05T15:17:10.107210+08:00",
    "misfire_grace_time": 1,
    "max_instances": 1,
    "next_run_time": "2021-03-05T15:17:10.107210+08:00"
}

充分利用APScheduler的Event机制

class CustomAPScheduler(APScheduler):
    # scheduler事件映射本地状态
    STATUS_MAPPING = {
        EVENT_JOB_ADDED: 0,
        EVENT_JOB_MODIFIED: 1,
        EVENT_JOB_SUBMITTED: 2,
        EVENT_JOB_EXECUTED: 3,
        EVENT_JOB_REMOVED: 4,
        EVENT_JOB_ERROR: 5,
        EVENT_JOB_MISSED: 6,
        EVENT_ALL_JOBS_REMOVED: 7,
        EVENT_JOB_MAX_INSTANCES: 8
    }

    def __init__(self, session, scheduler=None, app=None):
        super(CustomAPScheduler, self).__init__(scheduler, app)
        self.session = session

    def listener_all_job(self, event):
        """
        监控job的生命周期,可视化监控,并且可增加后续的没有触发任务等监控
        添加到线程做处理
        :param event:
        :return:
        """
        job_id = None
        args = []
        if event.code != EVENT_ALL_JOBS_REMOVED:
            job_id = event.job_id
        if job_id:
            jobstore_alias = event.jobstore
            job = self.scheduler.get_job(job_id, jobstore_alias)
            if job:
                name = job.name
                func = str(job.func_ref)
                trigger = job.trigger if isinstance(job.trigger, str) else str(job.trigger).split("[")[0]
                next_run_time = str(job.next_run_time).split(".")[0]
            else:
                name = None
                func = None
                trigger = None
                next_run_time = None
            args = [name, func, trigger, next_run_time]
        traceback = event.traceback if hasattr(event, 'traceback') else "",
        args.append(traceback)
        t = threading.Thread(target=self.handle_listener_all_job, args=[event.code, job_id, *args])
        t.start()
        t.join()

    def handle_listener_all_job(self, event_type, *args):
        """
        实际处理IO操作
        如何处理一个job_id重复使用的问题,采用本地id自增,如果真有job_id重复的情况,则认为指定的是最后一个job_id对应的任务
        """
        try:
            if event_type == EVENT_JOB_ADDED:
                # 添加任务定义表
                job = ApschedulerJobInfo()
                job.job_id = args[0]
                job.job_name = args[1]
                job.job_func = args[2]
                job.job_trigger = args[3]
                job.job_next_run_time = args[4]
                job.job_status = 0
                self.session.add(job)
                self.session.flush()
                # 增加任务事件表
                job_event = ApschedulerJobEventInfo()
                job_event.job_info_id = job.id
                job_event.event = self.STATUS_MAPPING[event_type]
                self.session.add(job_event)
                self.session.commit()
            elif event_type == EVENT_JOB_MODIFIED:
                # 修改job[取数据库表中job_id最后一个进行修改]
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_name = args[1]
                    job.job_func = args[2]
                    job.job_trigger = args[3]
                    job.job_next_run_time = args[4]
                    job.job_status = 0

                    # 增加任务事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_SUBMITTED:
                # 提交job执行
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 增加任务事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_EXECUTED:
                # 执行job
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 1

                    # 增加任务事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_REMOVED:
                # 删除job
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 5

                    # 增加任务事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_ERROR:
                # 执行job出错
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 2
                    job.job_traceback = args[5]
                    # 增加任务事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_MISSED:
                # job执行错过
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 3
                    job.job_traceback = args[5]
                    # 增加任务事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_ALL_JOBS_REMOVED:
                # 删除所有job
                all_jobs = ApschedulerJobInfo.query.filter(ApschedulerJobInfo.job_status == 0).all()
                for job in all_jobs:
                    job.job_status = 6
                    # 增加任务事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
            elif event_type == EVENT_JOB_MAX_INSTANCES:
                # job超过最大实例
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 4
                    job.job_traceback = args[5]
                    # 增加任务事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
        except:
            LOGGER.exception("执行任务异常")

    def init_app(self, app):
        super(CustomAPScheduler, self).init_app(app)

        # 增加监听函数,监听所有job的生命周期
        self.add_listener(self.listener_all_job,
                          EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES | EVENT_ALL_JOBS_REMOVED | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_MODIFIED | EVENT_JOB_EXECUTED | EVENT_JOB_SUBMITTED)

收集完成数据后进行展示及管理

  • JOB管理


    JOB管理
  • JOB事件执行明细


    JOB事件执行明细

关注公众号“战渣渣”,回复“调度”获得源码

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容