背景知识
Flink执行Application可以有三种方式
- Application Mode
- Per-Job Mode
- Session Mode。
以上三种方式的区别如下:
1. 集群生命周期以及资源隔离保证不同。
2. application的main方法是在客户端执行还是在集群执行。
下面是flink官网三种方式区别的截图
本文主要针对Session Mode方式进行介绍。
Flink Session Mode 提交
1. yarn-session启动集群
提交命令:
bin/yarn-session.sh -D yarn.container-start-command-template="/usr/local/jdk1.8/bin/java %jvmmem% %jvmopts% -Djob_name=test_job %logging% %class% %args% %redirects%" -d -n 1 -tm 1024 -jm 1024 -qu test.queue01 -s 1 -nm test_job
入口类 FlinkYarnSessionCli
主要做的功能内容:
- 加载flink-conf.yaml配置
- 初始化YarnClient,并启动
- 构建YarnClusterDescriptor,主要是为on yarn部署flink集群,设置部署相关信息。比如yarnClient, yarnQueue, flinkJarPath, configurationDirectory, flinkConfiguration, detached等。
- 创建application
- 初始化文件系统,并上传用户文件
- 提交application master,也就是部署集群。对应代码:yarnClient.submitApplication(appContext)。
- 返回RestClusterClient对象。
2. flink run提交作业到集群
提交命令:
bin/flink run -m yarn-cluster -c ${main_class} -p 1 -yid ${application_id} -d ${user_jar_path}
入口类 CliFrontend
主要做的功能内容:
加载flink-conf.yaml配置
构建YarnClusterDescriptor,与上面的yarn-session部分相同
开始运行program,执行用户的main方法
获取JobGraph
4.1 前提:streamGraph已存在,streamGraph是根据用户代码生成的
4.2 为streamGraph的StreamNode生成hash值,这个值以后作为JobVertexID来唯一标识节点
4.3 设置task chain关系
4.3.1 是否可chain,代码逻辑在StreamingJobGraphGenerator.isChainable方法中
- 下游算子的输入只有一个
- 下游算子不为空
- 上游算子不为空
- 上游和下游算子的slotSharingGroup相同
- 下游算子的ChainingStrategy为ALWAYS
- 上游算子的ChainingStrategy为或者HEADALWAYS
- 上下游算子中间的edge的Partitioner是ForwardPartitioner
- 上下游算子的并发相同
- streamGraph配置是可以chain的
4.3 设置PhysicalEdges
4.4 设置SlotSharingAndCoLocation,就是把streamGraph StreamNode的SlotSharingGroup属性设置到JobVertex中
4.5 配置checkpoint。主要就是设置triggerVertices(所有的输入节点),commitVertices(所有的节点),ackVertices(所有的节点)
4.6 设置执行配置信息,比如默认并发度,失败时retry次数,retry delay等-
提交job,把JobGraph提交给dispatcher
对应的源码如下:
接下来说下怎么把JobGraph提交给dispatcher的:
5.1 首先执行RestClusterClient的submitJob(@Nonnull JobGraph jobGraph)方法。client主要是构造SubmitRequest,包括jobgraph和用户jar文件。然后会调用sendRetriableRequest
5.2 调用RestClient的submitRequest。备注:RestClient是和RestServerEndpoint进行配对的,也就是RestClient的request会在RestServerEndpoint进行处理。
5.3 RestServerEndpoint在start方法中,会初始化很多handler,其中有一个JobSubmitHandler,这个handler做的就是把job提交到flink集群。接下来可以看JobSubmitHandler的handleRequest方法,处理job提交请求。
5.3.1 加载JobGraph
5.3.2 上传JobGraph以及其他的用户jar,上传主要用到BlobServer
5.3.3 submitJob,把job提交给dispatcher。接下来就可以根据dispatcher的逻辑,看作业提交之后的逻辑。
JobSubmitHandler.png
至此,作业提交给Dispatcher分析完成。