写在前面的话
下文翻译的时候,本人对 Cloud-Native 概念理解比较粗浅,如今有了进一步的理解,希望大伙儿在看下面文章的时候能够抱着以下几点观念去读:
云原生是一套方法论,用来指导我们更高效地构建和运行应用程序,包括几个方面:
- DevOps:开发/运维协作、自动化、CI工具(构建集成、测试、部署)
- 持续交付:敏捷模式、快速迭代、快速反馈
- 微服务:服务地组织形式,独立性、扩展性、解耦性、规范性
- 容器:服务运行载体,Docker,Kubernetes
--------------------------------------分割线-------------------------------------------------
原文地址
Flink 在早些时候就支持像YARN
、Mesos
这种资源管理系统,然而这些系统既不是为那些越来越流行的云原生架构而设计,也无法满足人们处理更复杂、混合的工作内容(如批处理、流处理、深度学习、网络服务)的要求。 因此,越来越多的用户选择在 Kubernetes 上去自动部署、扩缩容他们的 Flink 应用。
随着版本的不断更新,Flink 社区在“Kubernetes原生集成”方面取得了显著的进步,从Active Resource Management
到“轻Zookeeper”型的HA
。在接下来内容中,我们将着重从技术细节层面描述"如何在Kubernetes上原生部署Flink",深挖Flink的HA架构。之后我们将会通过一个实际例子来指导你在Kubernetes上部署一个Flink应用集群,并使能HA。最后我们会总结一下在Kubernetes上原生部署Flink应用的优势,并对其未来的工作做个介绍。
在Kubernetes上原生集成Flink
在我们开始学习如何实现基于Kubernetes的HA服务的技术细节之前,首先让我大致介绍一下针对在Kubernetes上部署Flink这件事来说,原生,到底意味着什么:
- Flink是自给自足的。Flink客户端中嵌入了一个Kubernetes客户端,因此你无需借助额外的工具,如:Kubecel,在Kubernetes中创建一个Flink集群。
- Flink客户端会通过API Server与Kubernetes直接通讯来部署JobManager。Flink客户端这边的配置、log4j以及Hadoop配置会自动同步给JobManager的Pod。
- Flink的资源管理器会根据需求直接对接API Server来实现TaskManager POD的分配与发布。
总的来说,这与在其他资源管理系统(如YARN、Mesos)中集成Flink相似,就是直接与Kubernetes的API Server进行交互。
Kubernetes中的高可用服务
在将Flink引入生产环境的过程中,高可用是一个很常见的要求:避免单节点型Flink集群的故障问题。在1.12之前的版本中,Flink提供的Zookeeper HA服务被广泛应用于生产环境,并且支持单节点集群、YARN以及Kubernetes部署。然而,为了HA而在Kubernetes中再管理一个Zookeeper集群,带来了额外的操作开销,而这些开销本可以避免,因为Kubernetes也提供了一些公共接口来实现leader选举以及配置存储(如ConfigMap)。从Flink 1.12开始,我们促使了这些特性的引入,来帮助用户在Kubernetes上更方便地运行一个HA型的Flink集群。
上图描述了Kubernets中高可用Flink服务的架构,具体工作流程如下:
1、针对leader选举,首先有一系列的JobManagers被标记为候选者,他们全部向外声明自己是leader,但最终只有一个会成为激活的leader,之后这个激活的JobManager会持续发送心跳以向其他人更新其leader的地位。同时,其他JobManagers也会时不时地尝试成为leader——这保证JobManager的快速故障恢复。不同的组件(ResourceManagr、JobManager、Dispatcher、RestEndpoint)拥有独立的leader选举服务以及ConfigMap。
2、被激活的leader会在ConfigMap中发布自己的地址,需要注意的是,Flink使用ConfigMap来实现竞争锁以及leader地址存储。这确保了在定期的更新期间不会发生意料之外的变更。
3、leader接收服务用以发现各个激活态leader的地址并允许各个组件之后的注册事宜。举个例子,TaskManagers接收ResourceManager和JobManager的注册地址并提供slots。在接受服务中,Flink通过
Kubernetes Watch
来监听ConfigMap变化——当ConfigMap内容发生变化时,这通常意味着leader的变更,这样监听器就可以马上获得最新leader的地址。4、其他所有的元信息(如运行的jobs、job图表、已完成检查器以及检查器计数器)都被直接存储到了相应的ConfigMap中。只有leader可以更新ConfigMap。这些HA数据只有当Flink集群收到终止信号时才会被清理。请注意,存在ConfigMap中的只是指针数据,具体的数据存在DistributedStorage中。通过这种间接型的映射可以保持ConfigMap的文件大小(MB级的ConfigMap可以存储GB级的数据)。
样例:高可用集群
你需要一个运行状态的Kubernetes集群,并且正确配置好KubeConfig。
1、将Flink job制成Docker镜像:
FROM flink:1.12.1
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
使用上面的Dockerfile构建镜像,并推送到自己的镜像仓:
docker build -t <user-image> .
docker push <user-image>
2、启动一个Flink应用集群
./bin/flink run-application \
--detached \
--parallelism 4 \
--target kubernetes-application \
-Dkubernetes.cluster-id=k8s-ha-app-1 \
-Dkubernetes.container.image=<user-image> \
-Dkubernetes.jobmanager.cpu=0.5 \
-Dkubernetes.taskmanager.cpu=0.5 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=s3://flink-bucket/flink-ha \
-Drestart-strategy=fixed-delay \
-Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.12.1.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.12.1.jar \
local:///opt/flink/usrlib/my-flink-job.jar
3、访问Flink的可视化网页端并查看运行的job
通过查看Flink 客户端的日志,你可以看到JobManager可视化页面的地址,如:
2021-02-05 17:26:13,403 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink application cluster k8s-ha-app-1 successfully, JobManager Web Interface: http://192.168.64.21:32388
4、杀掉JobManager来模拟故障
kubectl exec -ti {jobnamanger_pod_name} -- /bin/sh -c "kill 1"
5、确认job从最近一次成功探测中恢复
刷新页面直到新的JobManager启动,之后检查其运行日志来确认job恢复情况
2021-02-05 09:44:01,636 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 00000000000000000000000000000000 from Checkpoint 101 @ 1612518074802 for 00000000000000000000000000000000 located at <checkpoint-not-externally-addressable>.
6、取消job
job可以在页面上取消,也可以通过命令行取消:
./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID>
注意,当job被取消后,Flink创建的所有Kubernetes资源(JobManager deployment、TaskManager pod,Service,Flink Configuration ConfigMap以及leader相关的ConfigMap)都会被自动删除。
总结
Flink的Kubernetes原生集成是1.10版本开始的,在主机名复杂性,配置、管理以及操作Flink集群方面做了很多的云原生环境运行的抽象工作。经历过3次主要版本的发布之后,Flink社区在支持多发布模式(会话、应用)方面取得了显著的进步,同时,提供了一种不依赖Zookeepers的HA可选项模式。
与单节点Kubernetes集群相比,原生集成方式对用户更友好,并且对Kubernetes的上游知识要求更少。随着Flink对底层Kubernetes集群了解的加深,将来还可以利用其动态资源配置来更有效地利用Kubernetes资源。Flink下一步将会从Pod模板方面来加深其Kubernetes原生支持,使得Flink能够更灵活的使用Kubernetes集群的高级资源(Volumes,初始化容器,sidecar容器等)。这部分工作已经在进行,并且在1.13版本中将会呈现。