我们有一些自定义的Python模块,希望能够每天定时执行,因而使用了apscheduler
。我们还可能不定期地增加或者删除一些自定义的模块,因而用到了importlib
来动态引入自定义模块。另外,又希望程序能够多线程执行各个自定义模块,则又用到了threading
。
1.用importlab动态引入自定义模块
思路是我们把自定义模块的.py
文件都放到一个文件夹里(如./libs/
),然后每次要执行的时候,通过os.listdir
函数获取到当前在改目录下的文件名称,通过筛选后选出我们需要的,利用importlib
引入,然后执行。
在这里,我们需要注意的是,./libs/
目录下需要有名为__init__.py
的文件,这个文件中不需要有任何内容,但是如果没有的话我们就无法引入我们定义的模块。代码如下所示:
import importlib
import os
lib_list = os.listdir("./libs/")
for lib_fn in lib_list:
if lib_fn.endswith(".py") and not lib_fn.startswith("__"):
my_lib = importlib.import_module("libs.%s" % lib_fn.rstrip(".py"))
my_lib.MyClass(arg1)
上面的代码中,我们挑选出./libs/
下的.py
文件,并且排除开头为"__"
的文件。然后以文件名去掉.py
作为模块名引入即可。另外,我们在代码中假设自定义模块中都定义了一个名为MyClass
的类,这里,为了方便,所有的自定义模块中,我们都定义了名称相同的类。另外,arg1是我们定义的一个参数,可以没有,也可以多个。
importlib
具体的用法,可参考官方文档或者其他的相关文章。
2.用threading进行多线程编程
先直接放代码吧:
import threading
my_thread = threading.Thread(target=my_lib.MyClass, args=(arg1,))
my_thread.start()
如上所示,我们将my_lib.MyClass(arg1)
替换成上面这段代码中的3、4行即可。那么,我们将这两部分的代码整合:
import importlib
import threading
import os
def run_my_libs():
lib_list = os.listdir("./libs/")
for lib_fn in lib_list:
if lib_fn.endswith(".py") and not lib_fn.startswith("__"):
my_lib = importlib.import_module("libs.%s" % lib_fn.rstrip(".py"))
my_thread = threading.Thread(target=my_lib.MyClass, args=(arg1,))
my_thread.start()
为了方便下一步的定时调用,我们将代码整合到了函数run_my_libs
中。
3.用apscheduler定时启动程序。
本部分主要参考了这篇文章。
例如,如果我们设定每天8:30启动程序,那么执行以下代码:
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_job(run_my_libs, 'cron', hour=8, minute=30)
scheduler.start()
而如果我们想设定每24小时执行一次,那么执行以下代码:
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_job(run_my_libs, 'interval', hours=24)
scheduler.start()
上面的两段代码中'cron'
和'interval'
是两种触发器,具体的信息可参考这篇文章或者官方文档
版权声明:本文为博主原创文章,遵循 CC 4.0 BY 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://ywsun.site/articles/dxc_sche.html