Prefect 折腾手记:编写一个简易工作流程序

文章首发于个人公众号:「阿拉平平」

在处理复杂工作时,将所有的逻辑都写到一个任务中是一种很糟糕的做法。将其拆解成多个子任务,重新编排并监控运行状况则要靠谱的多。也许你正在寻找一个好用的工作流引擎,那么这款基于 Python 的工作流工具:Prefect[1] 说不定可以帮助到你。

在这篇文章中,我将介绍并演示 Prefect 的用法,编写一个简单的工作流程序来说明 Prefect 是如何使用的。文中使用的 Python 版本为 3.6.5,Prefect 版本为 0.13.19。

快速开始

安装 Prefect 前请确保已安装 Python,且版本在 3.6 以上。

安装很简单,执行以下命令:

pip install prefect

官方的示例代码如下:

from prefect import task, Flow, Parameter


@task(log_stdout=True)
def say_hello(name):
    print("Hello, {}!".format(name))


with Flow("My First Flow") as flow:
    name = Parameter('name')
    say_hello(name)


flow.run(name='world') # "Hello, world!"
flow.run(name='Marvin') # "Hello, Marvin!"

我们运行看下输出结果:

[2020-12-16 11:52:27+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow'
[2020-12-16 11:52:27+0800] INFO - prefect.TaskRunner | Task 'name': Starting task run...
[2020-12-16 11:52:27+0800] INFO - prefect.TaskRunner | Task 'name': Finished task run for task with final state: 'Success'
[2020-12-16 11:52:27+0800] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
[2020-12-16 11:52:27+0800] INFO - prefect.TaskRunner | Hello, world!
[2020-12-16 11:52:27+0800] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2020-12-16 11:52:27+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-12-16 11:52:27+0800] INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow'
[2020-12-16 11:52:27+0800] INFO - prefect.TaskRunner | Task 'name': Starting task run...
[2020-12-16 11:52:27+0800] INFO - prefect.TaskRunner | Task 'name': Finished task run for task with final state: 'Success'
[2020-12-16 11:52:27+0800] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
[2020-12-16 11:52:27+0800] INFO - prefect.TaskRunner | Hello, Marvin!
[2020-12-16 11:52:27+0800] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2020-12-16 11:52:27+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

可以看到,Prefect 很好地执行了任务,并输出了运行的日志。接下来,我们来尝试用 Prefect 编写一个简易的工作流程序。

项目实践

现在有这么个需求:获取 GitHub Trending 每日数据,并保存成 CSV 文件。这个要怎么实现呢?

我们先拆解下需求,大致可以分为以下步骤:

  1. download_data:调用接口,获取 GitHub Trending 每日数据。
  2. handle_data:对数据进行处理,选取需要的字段。
  3. save_data:将处理好的数据保存成 CSV 文件。

功能实现

以上的每个步骤分别对应一个子任务,我们来实现下。

首先是 download_data。通过 requests 库,这个不难实现,具体代码如下:

import requests

GITHUB_TRENDING_URL = "https://trendings.herokuapp.com/repo"

def download_data():
    params = {'since': 'today'}
    trending_data = requests.get(GITHUB_TRENDING_URL, params).json()
    return trending_data

我将接口返回的数据转成了 JSON 格式,数据如下:

{
    "count": 24,
    "msg": "suc",
    "items": [
        {
            "repo": "getmeli/meli",
            "repo_link": "[https://github.com/getmeli/meli](https://github.com/getmeli/meli)",
            "desc": "",
            "lang": "TypeScript",
            "stars": "951",
            "forks": "18",
            "added_stars": "291 stars today",
            "avatars": [
                "[https://avatars3.githubusercontent.com/u/32174276?s=40&v=4](https://avatars3.githubusercontent.com/u/32174276?s=40&v=4)",
                "[https://avatars3.githubusercontent.com/u/13135149?s=40&v=4](https://avatars3.githubusercontent.com/u/13135149?s=40&v=4)"
            ]
        },
... many more records  
    ]
}

接下来是实现 handle_data。由于只需要 items 中的内容,所以对其进行处理,代码如下:

def handle_data(data):
    return [i for i in data["items"]]

最后是 save_data。选取 items 中的字段,保存到本地,代码如下:

import csv

def save_data(rows):
    headers = ["repo", "repo_link", "stars", "forks", "added_stars"]
    with open("/tmp/trending.csv", "w", newline="") as f:
        f_csv = csv.DictWriter(f, headers, extrasaction='ignore')
        f_csv.writeheader()
        f_csv.writerows(rows)

工作流

子任务实现后,就可以用 Prefect 编写工作流了,代码片段如下:

from prefect import Flow

with Flow("GitHub_Trending_Flow") as flow:
    data = download_data()
    rows = handle_data(data)
    save_data(rows)

flow.run()

在 Prefect 中,Flow 用于描述任务之间的依赖关系,比如执行的先后顺序或是数据的传递。创建了 Flow 后,就可以通过调用 flow.run() 来执行。

任务

Prefect 将每个步骤当作一项任务,对应代码中的一个函数。但是,怎么将函数声明为 Prefect 的任务呢?

最简单的方法是使用装饰器 @task,比如将 download_data 声明为任务:

from prefect import task
import requests

GITHUB_TRENDING_URL = "https://trendings.herokuapp.com/repo"

@task
def download_data():
    params = {'since': 'daily'}
    trending_data = requests.get(GITHUB_TRENDING_URL, params).json()
    return trending_data

参数

现在需求有变:需要将获取的 GitHub Trending 数据从每日改成每周。考虑到之后时间还可能发生变化,所以我们将时间改为参数。

导入 Parameter,并将 since 作为参数传入,具体代码如下:

from prefect import task, Flow, Parameter
import requests

GITHUB_TRENDING_URL = "https://trendings.herokuapp.com/repo"

@task
def download_data(since):
    params = {'since': since}
    trending_data = requests.get(GITHUB_TRENDING_URL, params).json()
    return trending_data

with Flow("GitHub_Trending_Flow") as flow:
    since = Parameter("since")
    download_data(since)    

flow.run(since="weekly")

工作流编排

Prefect 提供了开源的 server 以及 UI 来编排工作流。但在使用前,请确保安装了 docker 和 docker-compose。

如果是第一次启动需要运行以下命令配置本地工作流:

prefect backend server

运行后会在 ~/.prefect 目录下生成配置文件,之后运行以下命令启动 server:

prefect server start

启动 server 后,访问 http://localhost:8080,如果 server 不安装在本机,则需要修改 ip 地址。同时也要注意修改主页 「PREFECT SERVER」中 GraphQL 的地址:

执行工作流任务至少需要运行一个 agent,可以在本机开启,命令如下:

prefect agent local start

接下来需要创建项目,可以通过命令行创建项目:

prefect create project "GitHub_Trending"

项目创建后,加入以下代码可以将工作流注册到 server 中,这里的 project_name 要和刚创建的项目名对应:

flow.register(project_name="GitHub_Trending")

运行代码进行注册,选好项目可以看到注册成功的工作流。


接着试下从页面运行工作流,不过别忘了指定参数的值:


运行过程中,我们可以看到每个任务执行所消耗的时间。而在 「SCHEMATIC」中,我们也可以很清晰地了解整个工作流任务的依赖关系。


写在最后

文章中仅演示了 Prefect 的部分功能,事实上,Prefect 中还有许多高级的用法,大家有兴趣的话,可以参考官方文档[2],相信大家完全可以编写功能更复杂的工作流程序。

对这个项目有兴趣的小伙伴也可以读下这篇文章[3],作者编写了个统计疫情数据并上传至 S3 的工作流程序,并在 GitHub 上开源了,很不错的一篇文章。

最后附上示例的完整代码:

#!/usr/bin/env python3

from prefect import task, Flow, Parameter
import requests
import csv

GITHUB_TRENDING_URL = "https://trendings.herokuapp.com/repo"

@task
def download_data(since):
    params = {'since': since}
    trending_data = requests.get(GITHUB_TRENDING_URL, params).json()
    return trending_data

@task
def handle_data(data):
    return [i for i in data["items"]]

@task
def save_data(rows):
    headers = ["repo", "repo_link", "stars", "forks", "added_stars"]
    with open("/tmp/trending.csv", "w", newline="") as f:
        f_csv = csv.DictWriter(f, headers, extrasaction='ignore')
        f_csv.writeheader()
        f_csv.writerows(rows)

with Flow("GitHub_Trending_Flow") as flow:
    since = Parameter("since")
    data = download_data(since)
    rows = handle_data(data)
    save_data(rows)

#flow.run(since="weekly")
flow.register(project_name="GitHub_Trending")

References

[1] Prefect:https://github.com/PrefectHQ/prefect
[2] 文档:https://docs.prefect.io/core/
[3] 文章:https://makeitnew.io/prefect-a-modern-python-native-data-workflow-engine-7ece02ceb396

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容