定时任务框架:多线程运行多个schedule任务

# encoding=utf-8
import logging
import logging.handlers
import queue
import time
import threading
import schedule
import os
import sys
import datetime
from pathlib import Path
import subprocess
import filecmp
import git
from git import Repo
import shutil
import os

#分两个文件夹 本地local和git_pulled 每次git pull以后把py文件所在的tasks文件夹移动到local的文件夹,实现py文件是最新的
# 配置文件:local和git_pulled两个里面两个配置文件作对比,有变化则清掉所有tasks标签的schedule,再新建tasks标签的schedule并按最新配置频率定时跑
# 从git上下载代码
def git_pull_file(con):
    try:
        print("%s is running" % con)
        r = Repo("../git_pulled")
        r.remote().pull()
        # clone_repo = git.Repo.clone_from('https://code.xingshulin.com/test/project.git', '../git_pulled')
    except:
        logger.error('拉取代码失败')

def cmp_file(content):
    global sem
    # 对比git下载的配置文件及使用的配置文件内容是否相同
    print("%s is running" % content)
    try:
        cmp_result = filecmp.cmp('../git_pulled/config.csv', '../local/config.csv')
        print(cmp_result)
        # 如果不相同,将tag为tasks的任务删除(带tag的任务均为执行py文件的任务),将最新的配置文件、py文件夹内容移动到使用的目录中
        if not cmp_result:
            # 将tag为tasks的任务删除
            # schedule.clear('tasks')
            # local py所在文件夹、config删除;再从git pulled 移动
            shutil.rmtree("../local/tasks")
            os.remove("../local/config.csv")
            shutil.move('../git_pulled/tasks', '../local/tasks')
            shutil.move('../git_pulled/config.csv', '../local/config.csv')
            # 再重新从配置文件中读取内容设置到schedule中
            run_new = config_read()
            sem = threading.Semaphore(len(run_new) + 2)
            logger.info('再次设置最大子线程数量%d' % (len(run_new) + 2))
            print(('再次设置最大子线程数量%d' % (len(run_new) + 2)))

            for k in range(0, len(run_new)):
                logger.info('运行定时任务%s' % run_new[k])
                eval(run_new[k])
        else:
            pass
    except OSError:
        logger.error('拉取最新文件读取异常')
    except:
        logger.error('获取最新文件失败')



# log打印,先创建文件夹
log_dir = Path('logs')
if not log_dir.exists():
    os.makedirs('logs')
# 创建Logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

# 文件Handler
fileHandler = logging.handlers.TimedRotatingFileHandler('logs/run_tasks%s.log' % datetime.datetime.now().strftime('%y%m%d'), encoding='UTF-8', when='D', interval=1, backupCount=7)
fileHandler.setLevel(logging.NOTSET)  #输出warning级别以上的日志

# Formatter
formatter = logging.Formatter('%(asctime)s-line %(lineno)d in %(filename)s - %(levelname)s - %(message)s')
fileHandler.setFormatter(formatter)
# 添加到Logger中
logger.addHandler(fileHandler)

files = ''
py_files = ''
weekday = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']

#封装读配置文件方法
def config_read():
    global files
    global py_files
    global weekday
    try:
        # 读取配置文件,文件名+频率
        with open('config.csv', 'r') as f:
            # csv文件第一行是格式注释,第一行不读跳过
            files = f.readlines()[1:]
        if len(files)==0:
            logger.error("配置文件无内容,请填好配置文件再重新运行")
            sys.exit()

        print('配置的脚本文件个数',len(files))
        logger.info('配置的脚本文件个数%d' % len(files))
    except FileNotFoundError:
        logger.error("无配置文件,请填好配置文件再重新运行")
        sys.exit()
    except OSError:
        logger.error('配置文件打开失败')
    except:
        logger.error('配置文件读取异常')

    try:
        # 对比config.csv里的文件个数和要跑的Py文件数量是否一致
        for root, dirs, py_files in os.walk('./tasks'):
            print(len(py_files))
        if len(files) != len(py_files):
            logger.warning('config.csv里的文件个数和要跑的Py文件数量不一致,请检查')
    except:
        logger.error('脚本文件夹读取异常')
    # 定时单位 {'count','at','seconds', 'minutes', 'minute', 'hours', 'hour','days', 'day', 'start', 'end', 'weekday'}删减后只支持'at','seconds','minutes','hours','day'
    # filename.py,'at','seconds','minutes','hours','day' 只支持配一个参数值,如果每天跑,必须传at其他不认,day 1-7 对应星期一~星期日
    jobs = []
    try:
        for i in range(0, len(files)):
            config = {}
            args = files[i].rstrip('\n').split(',')
            print('配置文件第%d行内容是%s' % (i+1, files[i]))
            # 配置不合法的处理
            if args[0].endswith('.py'):
                config['command'] = 'python ./tasks/' + args[0]  # 换用+拼接,用string.join()不能传变量
            else:
                logger.warning('%s格式不正确,不会定时跑,其他继续遍历' % args[0])
                continue

            config['at'] = args[1]
            if args[2].isdigit() and args[3].isdigit() and args[4].isdigit():
                config['seconds'] = args[2]
                config['minutes'] = args[3]
                config['hours'] = args[4]
            else:
                logger.warning('%s时/分/秒格式不正确,不会定时跑,其他继续遍历' % args[0])
                continue
            if args[5].isdigit() and int(args[5]) in range(0,8):
                config['day'] = args[5]
            else:
                logger.warning('%s星期%s格式不正确,不会定时跑,其他继续遍历' % (args[0],args[5]))
                continue
            jobs.append(config)
    except NameError:
        logger.error('配置文件变量获取失败')
    except ValueError:
        logger.error('配置定时参数类型转换异常')
    except:
        logger.error('配置定时参数异常')
    logger.info('配置文件和定时参数如下')
    logger.info(jobs)
    # 获取各文件名和配置的频率,抽出来所有不为空的值和它对应的键,存到数组
    each = []
    try:
        for i in jobs:
            content = {}
            for key, value in i.items():
                if value != '0':
                    content[key] = value
            each.append(content)
    except NameError:
        logger.error('配置定时参数不全为0的内容获取失败')
    except:
        logger.error('定时参数不全为0的获取异常')

    # 拼接command_job语句
    com_job = []
    try:
        for e in each:
            key = list(e.keys())
            if len(key) != 2:
                print('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % e['command'])   # 命令和频率放在同一个字典,key长度不为2则不添加schedule列表
                logger.warning('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % e['command'])
            # 判断条件不能是key[1] == 'at',key每次排序都不一样
            elif 'at' in key:
                # schedule.every().day.at("19:13").do(thread,job,"python tasks/4.py")
                s = 'schedule.every().day.at(\"%s\").do(thread,sem,job,\"%s\")' % (e['at'], e['command'])
                print('每天几点跑', s)
                com_job.append(s + '.tag("tasks")')
            elif 'day' in key:

                k = int(e['day']) - 1
                s = 'schedule.every().%s.do(thread,sem,job,\"%s\")' % (weekday[k], e['command'])
                print('每星期几跑', s)
                com_job.append(s + '.tag("tasks")')
            elif key[1] or key[0] in ['seconds', 'minutes', 'hours']:
                print('每几秒、分、时跑', key)
                # key[0] key[1]每次跑的结果不一样,所有不能以角标取固定的值
                if 'seconds' in key:
                    s = 'schedule.every(%s).seconds.do(thread,sem,job,\"%s\")' % (e['seconds'], e['command'])
                    com_job.append(s + '.tag("tasks")')
                elif 'minutes' in key:
                    s = 'schedule.every(%s).minutes.do(thread,sem,job,\"%s\")' % (e['minutes'], e['command'])
                    com_job.append(s + '.tag("tasks")')
                elif 'hours' in key:
                    s = 'schedule.every(%s).hours.do(thread,sem,job,\"%s\")' % (e['hours'], e['command'])
                    com_job.append(s + '.tag("tasks")')
        print('配置的定时任务是:', com_job)
    except NameError:
        logger.error('schedule语句拼接失败,其中有参数获取异常')
    except:
        logger.error('schedule语句错误')

    return com_job

def job(command):
    # command 如'python ./tasks/1.py'
    subprocess.run(command)



# 开启多线程 子线程发生的异常打印到log日志
def thread(sem_ar,job_func, command):
    # 判断线程释放了就开启新线程
    if sem_ar.acquire(False):
        t = MyThread(sem_ar, job_func, command)
        t.start()

class MyThread(threading.Thread):
    def __init__(self,my_sem,func,command):
        super().__init__()
        self.my_sem = my_sem
        self.func = func
        self.command = command


    def run(self):
        try:
            self.func(self.command)
        except:
            logger.error('子线程异常')
        self.my_sem.release()


# 控制最大子线程数:要跑的脚本文件总数 和文件对比、git pull两个线程
run_job = config_read()
sem = threading.Semaphore(len(run_job)+2)
logger.info('设置最大子线程数量%d' % (len(run_job)+2))
print(('设置最大子线程数量%d' % (len(run_job)+2)))
# schedule.every(1).hours.do(thread,sem, git_pull_file, "git_pull_file")
schedule.every().day.at("14:57").do(thread,sem, cmp_file, "cmp_file")


# 每个配置文件的频率各起线程跑 如'schedule.every(1).seconds.do(thread,sem,job,"python tasks/1.py").tag("tasks")'
for j in range(0, len(run_job)):
    logger.info('运行定时任务%s' % run_job[j])
    eval(run_job[j])


# #获取当前线程数量和内容
# def get_thread():
#     print('当前线程', len(threading.enumerate()),threading.enumerate())
# #每五秒获取一次
# schedule.every(5).seconds.do(get_thread)
try:
    while 1:
        schedule.run_pending()
except TypeError:
    logger.error('schedule传参异常')
except:
    logger.error('schedule运行异常')

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,324评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,356评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,328评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,147评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,160评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,115评论 1 296
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,025评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,867评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,307评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,528评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,688评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,409评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,001评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,657评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,811评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,685评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,573评论 2 353