-
基础
- Task
-
每个任务模块以class的形式存在,继承luigi.Task
class TestTask(luigi.Task): //任务参数 _params = luigi.Parameters() def require(self): // 每个任务的入口 return LastTask() def run(self): // 每个任务具体执行内容 pass def output(self): // 每个任务的出口 pass
-
每个任务模块重载方法:
- require方法:
def require(self): return LastTask()
- 这里主要是说明依赖关系,当前任务需要上一个任务LastTask()的执行结果(一般是有文件生成),如若需要可以将结果作为输入
- 可以依赖多个任务
return [LastTask1(), LastTask2()]
or
return {'x': LastTask1(), 'y': LastTask2()}
or
yield LastTask1()
yield LastTask2()
- output方法:
def output(self): return LocalTarget(path)
- 这里主要是说明任务执行的输出结果(一般是输出文件),作为检验任务是否完成的依据
- 返回的是一个luigi.Target对象
- 可以有多个输出结果
return [LocalTarget(path1), LocalTarget(path2)]
- run方法:
- 执行内容中需要把结果写入文件中,路径对应于output方法返回的Target的路径
- 在执行内容前可以用Target的方法makedirs()检查输出文件夹是否存在并创建
- 引用输出路径:
self.output().path
orself.output()['x'].path
orself.output()[0].path
引用依赖路径:self.input().path
orself.input()['x'].path
orself.input()[0].path
- require方法:
-
Parameters
- 流程入口的Task的变量需要由Parameters对象来定义:
class TestTask: test1 = luigi.Parameters() test2 = luigi.Parameters()
- 命令行执行流程时可以直接给Parameters类的变量赋值:
--test1 1 --test2 2
- 流程入口的Task的变量需要由Parameters对象来定义:
-
- Task
-
流程
- 由流程入口workflow定义整个流程中并行运行(yield调用)的任务块 TaskA & TaskB
- 任务调用时,会先检查output方法中定义返回的文件是否存在:
- 如果存在则表示该任务已经成功执行过了,就不会再继续执行,就会从这个节点跳过(包括该节点下的子任务)
- 如果不存在则会检查该任务的require方法中定义返回的任务对象TaskSon,从而跳转检查TaskSon任务的output方法
- 该检查会一直顺着任务树一直往下,直到找到一个已完成的子任务,然后回溯(即递归调用)
- 当任务树中某个任务节点的子任务已完成,则该任务会执行run方法(注,run方法中必须生成output方法中Target包装的文件)