# encoding=utf-8
import logging
import logging.handlers
import queue
import time
import threading
import schedule
import os
import sys
import datetime
from pathlib import Path
import subprocess
import filecmp
import git
from git import Repo
import shutil
import os
#分两个文件夹 本地local和git_pulled 每次git pull以后把py文件所在的tasks文件夹移动到local的文件夹,实现py文件是最新的
# 配置文件:local和git_pulled两个里面两个配置文件作对比,有变化则清掉所有tasks标签的schedule,再新建tasks标签的schedule并按最新配置频率定时跑
# 从git上下载代码
def git_pull_file(con):
try:
print("%s is running" % con)
r = Repo("../git_pulled")
r.remote().pull()
# clone_repo = git.Repo.clone_from('https://code.xingshulin.com/test/project.git', '../git_pulled')
except:
logger.error('拉取代码失败')
def cmp_file(content):
global sem
# 对比git下载的配置文件及使用的配置文件内容是否相同
print("%s is running" % content)
try:
cmp_result = filecmp.cmp('../git_pulled/config.csv', '../local/config.csv')
print(cmp_result)
# 如果不相同,将tag为tasks的任务删除(带tag的任务均为执行py文件的任务),将最新的配置文件、py文件夹内容移动到使用的目录中
if not cmp_result:
# 将tag为tasks的任务删除
# schedule.clear('tasks')
# local py所在文件夹、config删除;再从git pulled 移动
shutil.rmtree("../local/tasks")
os.remove("../local/config.csv")
shutil.move('../git_pulled/tasks', '../local/tasks')
shutil.move('../git_pulled/config.csv', '../local/config.csv')
# 再重新从配置文件中读取内容设置到schedule中
run_new = config_read()
sem = threading.Semaphore(len(run_new) + 2)
logger.info('再次设置最大子线程数量%d' % (len(run_new) + 2))
print(('再次设置最大子线程数量%d' % (len(run_new) + 2)))
for k in range(0, len(run_new)):
logger.info('运行定时任务%s' % run_new[k])
eval(run_new[k])
else:
pass
except OSError:
logger.error('拉取最新文件读取异常')
except:
logger.error('获取最新文件失败')
# log打印,先创建文件夹
log_dir = Path('logs')
if not log_dir.exists():
os.makedirs('logs')
# 创建Logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# 文件Handler
fileHandler = logging.handlers.TimedRotatingFileHandler('logs/run_tasks%s.log' % datetime.datetime.now().strftime('%y%m%d'), encoding='UTF-8', when='D', interval=1, backupCount=7)
fileHandler.setLevel(logging.NOTSET) #输出warning级别以上的日志
# Formatter
formatter = logging.Formatter('%(asctime)s-line %(lineno)d in %(filename)s - %(levelname)s - %(message)s')
fileHandler.setFormatter(formatter)
# 添加到Logger中
logger.addHandler(fileHandler)
files = ''
py_files = ''
weekday = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
#封装读配置文件方法
def config_read():
global files
global py_files
global weekday
try:
# 读取配置文件,文件名+频率
with open('config.csv', 'r') as f:
# csv文件第一行是格式注释,第一行不读跳过
files = f.readlines()[1:]
if len(files)==0:
logger.error("配置文件无内容,请填好配置文件再重新运行")
sys.exit()
print('配置的脚本文件个数',len(files))
logger.info('配置的脚本文件个数%d' % len(files))
except FileNotFoundError:
logger.error("无配置文件,请填好配置文件再重新运行")
sys.exit()
except OSError:
logger.error('配置文件打开失败')
except:
logger.error('配置文件读取异常')
try:
# 对比config.csv里的文件个数和要跑的Py文件数量是否一致
for root, dirs, py_files in os.walk('./tasks'):
print(len(py_files))
if len(files) != len(py_files):
logger.warning('config.csv里的文件个数和要跑的Py文件数量不一致,请检查')
except:
logger.error('脚本文件夹读取异常')
# 定时单位 {'count','at','seconds', 'minutes', 'minute', 'hours', 'hour','days', 'day', 'start', 'end', 'weekday'}删减后只支持'at','seconds','minutes','hours','day'
# filename.py,'at','seconds','minutes','hours','day' 只支持配一个参数值,如果每天跑,必须传at其他不认,day 1-7 对应星期一~星期日
jobs = []
try:
for i in range(0, len(files)):
config = {}
args = files[i].rstrip('\n').split(',')
print('配置文件第%d行内容是%s' % (i+1, files[i]))
# 配置不合法的处理
if args[0].endswith('.py'):
config['command'] = 'python ./tasks/' + args[0] # 换用+拼接,用string.join()不能传变量
else:
logger.warning('%s格式不正确,不会定时跑,其他继续遍历' % args[0])
continue
config['at'] = args[1]
if args[2].isdigit() and args[3].isdigit() and args[4].isdigit():
config['seconds'] = args[2]
config['minutes'] = args[3]
config['hours'] = args[4]
else:
logger.warning('%s时/分/秒格式不正确,不会定时跑,其他继续遍历' % args[0])
continue
if args[5].isdigit() and int(args[5]) in range(0,8):
config['day'] = args[5]
else:
logger.warning('%s星期%s格式不正确,不会定时跑,其他继续遍历' % (args[0],args[5]))
continue
jobs.append(config)
except NameError:
logger.error('配置文件变量获取失败')
except ValueError:
logger.error('配置定时参数类型转换异常')
except:
logger.error('配置定时参数异常')
logger.info('配置文件和定时参数如下')
logger.info(jobs)
# 获取各文件名和配置的频率,抽出来所有不为空的值和它对应的键,存到数组
each = []
try:
for i in jobs:
content = {}
for key, value in i.items():
if value != '0':
content[key] = value
each.append(content)
except NameError:
logger.error('配置定时参数不全为0的内容获取失败')
except:
logger.error('定时参数不全为0的获取异常')
# 拼接command_job语句
com_job = []
try:
for e in each:
key = list(e.keys())
if len(key) != 2:
print('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % e['command']) # 命令和频率放在同一个字典,key长度不为2则不添加schedule列表
logger.warning('%s所配置的频率不支持,这个文件不跑,其他文件正常跑' % e['command'])
# 判断条件不能是key[1] == 'at',key每次排序都不一样
elif 'at' in key:
# schedule.every().day.at("19:13").do(thread,job,"python tasks/4.py")
s = 'schedule.every().day.at(\"%s\").do(thread,sem,job,\"%s\")' % (e['at'], e['command'])
print('每天几点跑', s)
com_job.append(s + '.tag("tasks")')
elif 'day' in key:
k = int(e['day']) - 1
s = 'schedule.every().%s.do(thread,sem,job,\"%s\")' % (weekday[k], e['command'])
print('每星期几跑', s)
com_job.append(s + '.tag("tasks")')
elif key[1] or key[0] in ['seconds', 'minutes', 'hours']:
print('每几秒、分、时跑', key)
# key[0] key[1]每次跑的结果不一样,所有不能以角标取固定的值
if 'seconds' in key:
s = 'schedule.every(%s).seconds.do(thread,sem,job,\"%s\")' % (e['seconds'], e['command'])
com_job.append(s + '.tag("tasks")')
elif 'minutes' in key:
s = 'schedule.every(%s).minutes.do(thread,sem,job,\"%s\")' % (e['minutes'], e['command'])
com_job.append(s + '.tag("tasks")')
elif 'hours' in key:
s = 'schedule.every(%s).hours.do(thread,sem,job,\"%s\")' % (e['hours'], e['command'])
com_job.append(s + '.tag("tasks")')
print('配置的定时任务是:', com_job)
except NameError:
logger.error('schedule语句拼接失败,其中有参数获取异常')
except:
logger.error('schedule语句错误')
return com_job
def job(command):
# command 如'python ./tasks/1.py'
subprocess.run(command)
# 开启多线程 子线程发生的异常打印到log日志
def thread(sem_ar,job_func, command):
# 判断线程释放了就开启新线程
if sem_ar.acquire(False):
t = MyThread(sem_ar, job_func, command)
t.start()
class MyThread(threading.Thread):
def __init__(self,my_sem,func,command):
super().__init__()
self.my_sem = my_sem
self.func = func
self.command = command
def run(self):
try:
self.func(self.command)
except:
logger.error('子线程异常')
self.my_sem.release()
# 控制最大子线程数:要跑的脚本文件总数 和文件对比、git pull两个线程
run_job = config_read()
sem = threading.Semaphore(len(run_job)+2)
logger.info('设置最大子线程数量%d' % (len(run_job)+2))
print(('设置最大子线程数量%d' % (len(run_job)+2)))
# schedule.every(1).hours.do(thread,sem, git_pull_file, "git_pull_file")
schedule.every().day.at("14:57").do(thread,sem, cmp_file, "cmp_file")
# 每个配置文件的频率各起线程跑 如'schedule.every(1).seconds.do(thread,sem,job,"python tasks/1.py").tag("tasks")'
for j in range(0, len(run_job)):
logger.info('运行定时任务%s' % run_job[j])
eval(run_job[j])
# #获取当前线程数量和内容
# def get_thread():
# print('当前线程', len(threading.enumerate()),threading.enumerate())
# #每五秒获取一次
# schedule.every(5).seconds.do(get_thread)
try:
while 1:
schedule.run_pending()
except TypeError:
logger.error('schedule传参异常')
except:
logger.error('schedule运行异常')
定时任务框架:多线程运行多个schedule任务
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...