luigi使用 - multiple pipeline


一般地,通常使用luigi框架搭建流程都是只有一个pipeline(暂时没有通过google找到有多个Pipeline的教程)

由于工作需要,需要把之前写好的多个流程串联起来作为一个总的pipeline,并且各个pipeline之间有一定的依赖关系


假设有 pipeline_1, pipeline_2, pipeline_3 三个子流程(可单独运行),结构如下:

  class TaskSon(luigi.Task):
    def run(self):
      pass
    def output(self):
      return luigi.LocalTarget("tmp")

  class workflow(luigi.Task):
    def required(self):
      return TaskSon()

同时,有主流程main_pipeline,结构如下:

  class Pipeline1_Task(luigi.Task):
    def run(self):
      # 执行子流程 pipeline_1
      pass
    def output(self):
      # 返回子流程 pipeline_1 的输出
      pass

  class Pipeline2_Task(luigi.Task):
    def required(self):
      # 依赖于子流程 pipeline_1 的输出
      return Pipeline1_Task()
    def run(self):
      # 执行子流程 pipeline_2
      pass
    def output():
      # 返回子流程 pipeline_2 的输出
      pass

  class Pipeline3_Task(luigi.Task):
    def required(self):
      # 依赖于子流程 pipeline_2 的输出
      return Pipeline2_Task()
    def run(self):
      # 执行子流程 pipeline_3
      pass
    def output(self):
      # 返回子流程 pipeline_3 的输出
      pass

  class workflow(luigi.Task):
    def required(self):
      return Pipeline3_Task()
Screenshot.png

这里需要考虑一个问题

  • 如何将子流程的输入输出跟主流程中对应任务的输入输出对接

为了解决这个问题,首先需要考虑,如何将子流程中所有任务的输出反馈到主流程

  • 一般地,流程的结构设计都是有一个主入口(workflow),由主入口任务(在required方法中)初始化并启动其他任务

  • 那么,就需要在workflow任务中把整个流程中其他任务的输出作为一个整体输出:

  class workflow(luigi.Task):
    def required(self):
      return [otherTask()]
    def output(self):
      # *** 这样就可以将主入口所依赖的所有其他任务的输出返回 ***
      return self.input()

既然能够获取到子流程中所有任务的总输出,那么就需要考虑把输出反馈给主流程

  • 考虑到workflow任务获取其他任务的总输出的方法,可以直接将workflow的output方法跟对应主流程任务的output方法结合:
  class Pipeline1_Task()
    def output(self):
      # *** 这样子流程的output就会跟主流程任务的output对接 ***
      # 同时,这样处理在主流程启动时,luigi框架依旧是会检查子流程的输出是否已经完整
      from pipeline1 import workflow as pipeline1
      return pipeline1().output()
  • 至于主流程中的任务的依赖就比较容易处理了:
  class Pipeline2_Task(luigi.Task):
    def required(self):
      # 由于Pipeline1_Task的输出即为子流程pipeline_1的输出,所以这里luigi会检查到子流程pipeline_1的输出是否完整
      return Pipeline1_Task()

子流程的输入输出已经可以跟主流程的输入输出对应上了,那么就需要考虑如何怎么运行子流程

这里是没有办法通过pipeline1.workflow().run()直接执行,因为入口任务是没有重载run方法

  • 所以,这里把子流程作为一个黑箱执行:
  class Pipeline1_Task(luigi.Task):
    def run(self):
      from pipeline1 import workflow as pipeline1
      # *** 黑箱 ***
      luigi.Build([pipeline1()])

由于需要确保主流程中的任务“挂载”的是统一的一个子流程,则可以定义一个变量来储存子流程对象

  class Pipeline1_Task(luigi.Task):
    pipeline = None
    def run(self):
      luigi.Build([self.pipeline])
    def output(self):
      # 由于每个任务在流程中优先执行的是output方法(当任务被依赖的时候luigi会利用output方法检查输出的完整性),所以self.pipeline的初始化应该在output方法内执行
      from pipeline1 import workflow as pipeline1
      self.pipeline = pipeline1()
      return pipeline1.output()

这样,就可以完整地把子流程装载到主流程的任务中

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 224,242评论 6 522
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 95,953评论 3 402
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 171,299评论 0 366
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 60,709评论 1 300
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 69,723评论 6 399
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,236评论 1 314
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,629评论 3 428
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,594评论 0 279
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,135评论 1 324
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,156评论 3 345
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,285评论 1 354
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,914评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,600评论 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,073评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,203评论 1 275
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,798评论 3 381
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,339评论 2 365

推荐阅读更多精彩内容

  • 策划活动需要有创意,需要不断创新。不要做常规的打折、促销、优惠券、买一送一,节假日促销、年终大促、老板跳楼价,太没...
    春天的昵图阅读 474评论 0 0
  • 昨天深夜,丁彦雨航在个人微博上宣布了经过一个礼拜的时间深思熟虑后的决定——即将到来的新赛季继续留在CBA为山东队冲...
    我的名字叫做坚韧阅读 282评论 0 0
  • 金句:我拼尽全力,只为换来一个不确定的奇迹! 把自己一整天锁在家里,目的就是静默思考。 完成了的事项: 1.60s...
    Ada彩英阅读 124评论 0 0
  • 今天去和弟弟吃了麻辣香锅,花了100多元,味道确实美,还喝到了很好喝的茶味满满的甜胚子奶茶,看见了童工,热情而让人...
    简单的J阅读 152评论 0 0