从旧系统迁移到新系统总是一个痛苦的过程,这篇文章介绍了Pinterest怎样定义自动化的迁移层帮助用户从旧系统迁移到新的工作流系统。原文:Spinner: The Mass Migration to Pinterest’s New Workflow Platform[1]
我们在上一篇文章中讨论了如何做出决定并采取行动,将旧系统Pinball[2]迁移到j基于Apache Airflow的新系统Spinner上。提醒一下,我们从Airflow 1.10-stable版本创建了自定义分支,并且从主干上cherry-pick了部分功能。
本文将介绍我们如何设计和完成迁移,明确需求,并与工程师团队协调,无缝的将3000多个工作流迁移到Airflow。我们将深入探讨所做的权衡,但在此之前,我们先介绍一下学到的经验教训。
按照我们的标准,一个成功的迁移过程的关键是:
- 理解并填补之前的内部工作流系统与Airflow之间的差距,确定特性差异、安全差异和用户习惯使用的术语。
- 提供以低成本方式同时大规模迁移工作流的工具,并提供验证方法。
- 与用户进行清晰持续的沟通,并提供相关材料,如wiki、培训课程、积极的用户支持和公告。
- 启用无状态UI分区调度器,支持kubernetes集成,以提供定制化、可伸缩的解决方案。
- 提供清晰的CI/CD流水线,帮助系统一致、可靠、高效的维护多个基础架构分支。
- 严格测试,在预发环境里进行单元测试和集成测试,以阻止破坏性变更,并在部署时采取谨慎的方法。
- 维护运行状况检查和综合指标,并在负载增加时对告警进行微调。
最需要避免的问题包括:
- 确保迁移之前和之后的调度一致。由于定义调度器的方式不同(旧系统并非完全基于cron),因此旧系统和Spinner系统的调度间隔并不总是一致的。因此,要防止操作被忽略或者被过度操作。
- 为每个任务分配内存等资源,以防止任务启动失败。
- Kubernetes pod的初始化成本是非预期的,pod启动延时确实有不小的成本,必须被团队内所有用例所接受。
- Kubernetes pod内冗余的sidecar会增加延时与网络问题,并会增加工作流的调度延时。
- 在用户教育和支持方面的投资可能会很高。
- 为维护旧的DSL和新的Airflow DSL而增加的混合解决方案的成本开销并不低。
接下来看看我们是怎么解决这些挑战的。
方法与需求
我们将平台的迁移需求定义为:
- 用户只需做出最少的代码更改
- 迁移时不需要中断生产工作流的执行
- 设置遗留系统的下线日期并完成系统下线
考虑到这些需求,我们可以有两种方式进行迁移:
- 请求工作流的所有者在Arflow DSL中重写旧工作流,并在这个转换过程中提供支持
- 平台提供直接进行DSL转换的工具
使用方法1,可以减少我们和用户的技术债务,平台不必维护额外的基础设施,但由于所有定制的用户逻辑和依赖关系已经被添加到遗留的Pinball任务中,因此存在一些挑战。不过即使没有这些挑战,我们也没有让用户接受这个提议,因为每个团队都得花费大量工程时间来做迁移,而这些都是成本。最后,由于需要依赖用户来完成工作,可能会推迟遗留系统的下线时间,因此这种方式并不可行。
因此,我们采用的方法更接近方法2:我们在Airflow调度器中构建了一个迁移层,动态解析DAG文件,将遗留工作流系统中的工作流定义转换为Airflow的DAG。这意味着不需要用户修改代码,为用户提供了透明的迁移体验。遗留工作流中的每个作业都被转换为包装操作器类型,该类型是专门为支持工作流迁移用例而实现的。在执行期间,操作器启动新的k8s pod,使用遗留系统的镜像启动遗留作业的实际逻辑。通过这种方式,我们可以为迁移后的任务模拟遗留系统的执行环境。
迁移层
重申一下,这个项目的目标是用最少的用户工作量尽量透明的推动工作流迁移。下图展示了这一过程的端到端体验,后续我们将更深入研究各个组件。
左侧的组件是Pinterest迁移调度程序,这是基于原生Airflow调度器构建的,并利用了之前编写的多分区调度器。
PinterestDagBag
当调度器在迁移模式下启动时,使用定制的DagBag类,命名为PinterestDagBag,其负责从迁移元数据文件(而不是python的DAG文件)解析DAG。为了更好理解,我们需要介绍之前的Pinball[2]系统是如何工作的。
Pinball有令牌(Token)的概念:当Pinball工作流做好运行准备,工作流解析器将把工作流定义转换成令牌,存储所有必需的运行时信息。PinterestDagBag从遗留系统的工作流解析器中检索工作流定义(也就是令牌),遗留系统托管在名为令牌获取器(Token Fetcher)的容器中。然后将传统的工作流定义转换为原生Airflow DAG和运行中的任务(例如操作器或感应器)。
完成这种抽象和转换(不需要DAG文件)的方法实际上非常简单,一个DAG文件本质上只是一个或多个DAG的标识符。对于原生Airflow来说,DAG文件恰好携带了工作流定义,但完全有可能以一种不直接包含工作流定义,而是指向定义所在源代码的方式来组成DAG文件。我们编写了一个“dag文件”,表示遗留工作流定义的托管位置,并确保定制的PinterestDagBag模块能够从中解析出DAG对象。迁移元数据文件的示例如下:
{
“cluster_name”: “core001”,
“workflow_name”: “test_workflow”,
“migration_date”: “2020–01–01 00:00:00”
}
调度器能够发现和处理的元数据是在工作流迁移启动时生成的(我们将在后面详细描述),每个迁移元数据文件都表示如何在Token Fetcher容器的帮助下获取遗留工作流的定义,下一节将讨论这部分内容。
Token Fetcher
一旦迁移调度器发现元数据,Token Fetcher容器就开始发挥作用,运行遗留系统的解析器,并与迁移调度器一起工作,其开放API可以用来检索遗留工作流规范以及解析作业。遗留工作流中的每个作业都被解析成一个作业令牌数据结构,其中包含规范,以及最重要的作业执行命令,如下所示:
python data_job_runner.py — job_full_class=reporter.examples.ExampleSparkJob — executor=prod_011 — job_args=”end_date=2021–12–30"
通过Toke Fetcher容器,PinterestDagBag模块可以调用相应的API,根据迁移元数据文件检索工作流规范和作业令牌。
PinboardOnK8sOperator
在深入研究这个特殊的操作器之前,我们回顾一下Pinboard是什么。在上一篇文章里,我们介绍过Pinboard是Pinterest的python代码单一存储库,在之前的系统中,所有工作流和作业都定义在这个存储库里。
一旦我们获得了来自Token的数据,就使用定制的PinboardOnK8sOperator操作器来包装遗留作业的Token抽象。每个作业令牌被转换为该操作器的一个实例,存储从检索到的令牌中获取的执行命令。在执行期间,启动一个k8s pod,加载pinboard来执行命令,以模拟遗留系统的作业执行环境。这也可以防止对Airflow执行环境造成任何干扰。
Airflow的序列化DAG特性被用来序列化迁移后的DAG和任务,有助于减少DAG的解析开销。PinterestDagBag只在工作流的序列化DAG不存在时才调用Token Fetcher来检索作业Token并进行转换。同样,当迁移的DAG的要被调度执行时,DagFileProcessor再次调用Token Fetcher来检索最新的作业Token并刷新序列化的DAG。这个序列化DAG也用在UI渲染中,所以不需要在Web服务器上启动Token Fetcher容器。此外,由于执行PinboardOnK8sOperator所需的属性都是可序列化的,所以在执行迁移任务时也使用了序列化DAG特性。
迁移工具
为了简化工作流迁移的过程,我们构建了一个UI工具,让用户可以将现有的工作流迁移到Airflow。只需几次单击,就可以在旧系统上停止调度工作流,并将其调度到新的Spinner集群上。一旦工作流被迁移,迁移元数据文件将被上传到s3,并且可以被迁移调度器发现。该工具还支持迁移回遗留系统、发布高级迁移报告并提供管理员角色,以帮助用户管理迁移记录。
我们还将迁移API公开给其他系统的下游服务,帮助这些系统用可编程的方式构建工作流。
通过这个工具,迁移工作流只需要几分钟时间,而不需要用户花上几个小时重写代码,减轻了用户负担。用户只需要登录UI,选择工作流,将其调度到Spinner中运行,验证输出是否有效(这需要手动操作),最后通过关闭迁移记录来结束迁移工作。这个工具对于平台和用户来说是非常重要,如果没有它,我们就不可能在一年内完成迁移的目标。
动态DAG
我们的迁移工作需要但是Airflow不支持的一个主要功能是动态DAG。动态DAG可以根据调度器处理的不同动态生成不同的DAG布局。例如,如果DAG布局是基于某些外部服务或数据的状态生成的,那么就会和从DAG文件加载的布局有所不同,并且和解析DAG文件的时间相关。我们期望当新的DAG被调度执行时,就能够确定DAG布局。计算出来的布局将会被保存下来,并且DAG的执行将会与保存的布局保持一致。worker可以基于保存的布局加载任务,而不需要再次进行DAG解析,而再次解析可能会生成一个不同的DAG布局。
Airflow原生并不支持这个功能,其潜在问题就在于当worker试图执行DAG任务时,从DAG文件检索到的布局和DAG执行是创建的布局是不一致的。在这种情况下,worker无法从DAG获得特定的任务。
我们基于Airflow构建了这一功能,设计了一种名为DynamicDAG的新型DAG,并公开了compute_layout方法。任务实例化逻辑封装在compute_layout方法中,而不是在DAG文件的最外层定义任务。这个方法只在创建新的DAG运行时被调用生成DAG布局,布局快照将被保存并绑定到这个DAG运行中,所以当需要获得某个特定DAG运行时的任务时,系统能够从保存的DAG布局中检索,而不是从DAG文件中加载。下面的代码片段展示了如何使用DynamicDAG接口构建动态DAG。
dag = DynamicDAG(
dag_id=”dynamic_test”,
compute_layout=compute_layout,
skip_early_layout_compute=True,….
)
def compute_layout(dynamic_dag: DynamicDAG, execution_date: datetime = None, dagrun_conf: dict = None) -> None:
“””
Compute layout for dynamic DAG
“””
# Use random int
rand_int = random.randint(1, 3)
for i in range(rand_int):
python_task_1 = dynamic_dag.add_task_into_dynamic_dag(operator_class=PythonOperator,
task_id=f’python_task_{i}’,
python_callable=python_callable,
op_kwargs={‘task_id’: f’python_task_{i}’,
‘execution_date’: execution_date})
python_task_2 = dynamic_dag.add_task_into_dynamic_dag(operator_class=PythonOperator,
task_id=f’python_task_{i}_v2',
python_callable=python_callable,
op_kwargs={‘task_id’: f’python_task_{i}’,
‘execution_date’: execution_date})
python_task_1 >> python_task_2
请注意,虽然我们是为了帮助迁移而创建了这个类,但它也适用于本地工作流,我们可以将这个类用于业务逻辑。
我们修改了Airflow中的主要组件,如调度器、Web服务器和执行器,以支持我们的版本特性,下面的流程图显示了有/没有DAG版本特性的调度器处理逻辑的差异。在新的设计中,调度器模块有两个主要的变化:
- DAG布局将在DAG运行时创建期间被重新生成并被序列化,所以一个新创建的DAG运行时将总是被绑定到一个特定的DAG布局。
- 当调度器处理特定版本的DAG布局时(例如验证DAG完整性或调度任务实例),系统将加载并反序列化DAG布局。通过这种方式,确保总是能够调度和执行正确版本的DAG。
Kubernetes对迁移的支持
概述
基础设施层利用内部Kubernetes集群,从而提供“无限”的可伸缩性、与其他任务的隔离、易于维护和升级,以及改进的安全性。
在上图中,可以看到迁移的任务用例经历了两个迭代。迁移的任务用例有一个Airflow的工作pod,然后启动一个迁移的任务pod来加载调用和运行命令所需的环境。这种pod over pod场景增加了额外的2-4分钟的启动时间,可能会给用户作业带来沉重的成本。稍后,我们将介绍增强的迁移任务用例,可以在原来的工作pod中运行迁移后的pod逻辑,从而节省了启动第二个pod的成本。执行增强的迁移任务工作pod时,生命周期如下所示。
迁移任务工作Pod
Airflow工作容器启动Airflow任务命令,生成迁移后的任务命令,该命令将被发送到pinboard容器,而pinboard容器只是一个可以调用旧DSL逻辑并返回输出状态的容器。Airflow工作容器只是监视pinboard容器的活跃度,等待其退出并返回状态。从用户的角度来看,当UI试图获取活动任务日志时,它是一个独立进程,使用kubernetes API从主机提取日志。Airflow工作容器轮询查询状态,直到任务完成。
Pod构建
最后我们介绍一下工作Pod的生成方式。
下面会更详细的解释帮助容器生成规范yaml的不同组件及其各自的任务操作。在Kubernetes执行器中,当一个任务计划运行时,会生成Airflow工作Pod规范。因为是迁移任务,也会生成pinboard容器规范,并将其合并到迁移的Airflow工作Pod规范中。最终,该规范将提交给kubernetes集群,以启动一个带有工作容器和pinboard容器的Airflow工作Pod。
从序列图中,你可能还会注意到一些资源分配步骤。在kubernetes环境中,我们需要预定义Pod的资源。因此,我们还利用一些历史数据以及可以直接从UI更新为配置的托管数据来帮助我们为每个迁移的任务进行智能资源分配。我们在内部创建了一个流程来跟踪任务Pod的资源使用情况,以便更好理解他们的行为,并最大限度的节约资源。
部署
如前一节所述,在执行迁移期间,将启动单独的k8s pod/容器,使用遗留系统的镜像(即pinboard镜像)运行实际业务逻辑,从而确保任务的行为在迁移后保持不变。因此,我们构建了专用的CI/CD流水线来生成、验证和发布镜像。
迁移后的pinboard镜像的部署生命周期遵循以下步骤:
- 触发Jenkins作业,基于最新的提交构建pinboard镜像。我们的Teletraan[3]管理工具会按预定节奏触发,或者手动触发。
- 发布工件,另一个Jenkins作业会查看是否发布了任何变更,如果有变更,就运行DAG单元测试并发布预发镜像。
- 然后,定期执行的验证工作流将预发镜像发布到金丝雀环境中,并执行一组触发作业,这些作业触发其他监测工作流来验证预发镜像,并查询使用ExternalTaskSensors的预发镜像的状态。这些金丝雀工作流是为了测试旧系统中常见的作业类型,以覆盖最常用的操作器。金丝雀工作流套件还包括用户贡献的工作流,以保护他们的关键工作流不受可能破坏其流水线的问题镜像的影响。一旦所有的感应器任务接收到成功状态,就发布生产镜像,供Web服务器、调度器和worker在生产中使用。如果在金丝雀验证测试期间出现故障,工作流团队将得到自动通知,并需要手动检查问题以纠正并重新部署。
这个部署流水线还允许发布热修复版本,以保护所有用户在几个小时后不受完整部署的影响,而只是发布一个特定的提交。单一存储库有时具有复杂的依赖,可能导致许多意想不到的任务失败。金丝雀验证流水线允许我们在任何变更影响到生产环境之前捕获潜在的问题。
DAG文件同步
正如迁移工具一节中提到的,Spinner自动迁移工具生成一个将发布到s3的迁移元数据文件,该文件是调度器的标识符,用于查找迁移后的工作流和作业令牌。同步服务在Airflow Web服务器和调度器上运行,同步主机上的迁移元数据文件与来自s3的DAG文件,并且也基于调度器层和分区号。正如在上一篇文章中提到的,有多个调度器同时服务于迁移工作流和原生工作流,但是一个调度器只能其中一种DAG,不能同时处理两者。任何新的迁移元数据文件都会在8秒内同步到调度器,然后由PinterestDagBag模块处理。下面是我们现有的迁移调度程序分区。
指标
在工作流迁移项目开始时,系统上运行的大多数工作流都是从遗留系统迁移过来的。为了度量系统的健康程度,需要更重视这些工作流。正如在上一篇文章中提到的,我们的系统级SLO是跨多个集群承载的所有调度器的聚合加权平均值。因此,迁移后的调度器具有更高的权重,因为包含更多和更高层次的工作流。SLO是通过每15分钟为每个调度程序运行一个预定的DAG来测量的,这个DAG会发出统计信息。如果任何指标丢失了一个点,加权平均值将下降,我们测量该指标的总体正常运行时间不会低于98%。任何指标丢失一个数据点,都会通知工作流团队,除非速率低于阈值(通常意味着更大的问题),并不会通知所有用户。
结束语
我们与其他想要探索如何增强主要组件来定制业务需求的Airflow爱好者分享我们的发现,我们采用基本的Airflow系统,加入自定义修改,支持其与我们的迁移层协同工作,协调客户工作流。
希望这篇关于如何从旧系统迁移到Pinterest Airflow系统的文章对你有所帮助。
References:
[1] Spinner: The Mass Migration to Pinterest’s New Workflow Platform: https://medium.com/pinterest-engineering/spinner-the-mass-migration-to-pinterests-new-workflow-platform-997d9243f56a
[2] Pinball: Building workflow management: https://medium.com/@Pinterest_Engineering/pinball-building-workflow-management-88a044c9b9e3
[3] Teletraan: https://github.com/pinterest/teletraan
你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。
微信公众号:DeepNoMind