Flink on Yarn模式启动流程分析

 Flink On Yarn 架构



Flink Cluster on Yarn启动过程中,大体可以分为二个阶段

1. Filnk Client发起请求,申请启动Flink Cluster on Yarn

2. Yarn RM接收请求,并指定NM分配Container启动Flink Cluster.

(需要配置YARN_CONF_DIR, HADOOP_CONF_DIR ,HADOOP_CONF_PATH其中一个用来确保Flink能够访问HDFS和Yarn的RM。)


主要启动流程

1. 进程

首先我们通过下面的命令行启动flink on yarn的集群(Flink Client发起请求)

安装Flink:只需在一台可以连接至Yarn & HDFS集群的任意节点安装即可

启动脚本(命令):./bin/yarn-session.sh -n {num} -jm {num} -tm {num}

运行实例:yarn-session.sh中运行的最后命令是:java …org.apache.flink.yarn.cli.FlinkYarnSessionCli

/opt/meituan/flink-1.6.2/bin/yarn-session.sh -D yarn.container-start-command-template="/usr/local/jdk1.8.0_112/bin/java %jvmmem% %jvmopts% -Djob_name=appflow_log_fact -DjobID=10088 -Dengine_type=FLINK -Djob.suffix.topic=appflow-rtdw %logging% %class% %args% %redirects%" -d -n 120 -tm 4096 -jm 8000 -qu hadoop-rt.queue01 -s 1 -nm appflow_log_fact

这里将产生总共五个进程

** 1个FlinkYarnSessionCli ---> Yarn Client **

** 1个YarnApplicationMasterRunner ---> AM + JobManager**

3个YarnTaskManager --> TaskManager

即一个客户端+4个container,1个container启动AM,3个container启动TaskManager。

2.启动流程

1.FlinkYarnSessionCli 启动的过程中首先会检查Yarn上有没有足够的资源去启动所需要的container,如果有,则上传一些flink的jar和配置文件到HDFS,这里主要是启动AM进程和TaskManager进程的相关依赖jar包和配置文件

简单描述FlinkYarnSessionCli的主要内容

1) 根据FLINK_CONF_DIR & (YARN_CONF_DIR | HADOOP_CONF_DIR) load相关配置

2) 创建yarnClient,并申请一个applicationId

3) 将Flink集群运行所需要的Jar & Conf PUT至HDFS上

4) 封装ApplicationMaster启动需要的Env & Cmd至Request对象中,并用yarnClient对象发起请求,等待响应

5) 确认启动成功后,将重要信息封装成properties文件,并持久化至本地磁盘

注意事项:

步骤三中的HDFS路径,默认为:/user/{user}/.flink/{applicationId}

- 如果HDFS没有为该user创建 /user/{user} 目录,将抛出异常

- 由于该步骤中需要使用到applicationId,所以需要先通过yarnClient申请applicationId

步骤四才会真正的向Yarn申请资源运行ApplicationMaster

- AM并不是Yarn的接口实现类,而是封装至Context中的启动命令 & 环境变量等相关信息

启动成功后生成的properties文件中 最重要的信息为applicationId,将在之后的Flink Job提交时用于查找Cluster信息

- properties文件持久化路径,默认为:/tmp/.yarn-properties-{user}

- 如果在一个节点启动多个Session,则需要注意这个文件位置(目前还未研究)

2.接着yarn client会首先向RM申请一个container来 ApplicationMaster(YarnApplicationMasterRunner进程),然后RM会通知其中一个NM启动这个container,被分配到启动AM的NM会首先去HDFS上下载第一步上传的jar包和配置文件到本地,接着启动AM;在这个过程中会启动JobManager,因为JobManager和AM在同一进程里面,它会把JobManager的地址重新作为一个文件上传到HDFS上去TaskManager在启动的过程中也会去下载这个文件获取JobManager的地址,然后与其进行通信;AM还负责Flink的web 服务,Flink里面用到的都是随机端口,这样就允许了用户能够启动多个yarn session。

启动命令

两组件,三阶段

1) RM接收请求,并查询可用NM,并使其启动Container来运行AM

2) NM接收调度,并依据信息相关信息,将Jar & Conf从HDFS下载至Local,同时还依据Cmd & Env在本地生成launcher脚本

3) 通过运行launcher脚本,来启动ApplicationMaster(从源码中可以发现,Flink Client发送来的Cmd为:java … YarnSessionClusterEntrypoint)

简单描述FlinkSessionClusterEntrypoint的主要内容

1) 启动基于Akka的 RPC Service & Metric Register Service

2) 启动HA Service & Heartbeat Server

3) 启动BLOB Server & ArchivedExecutionGraphStore (会在Local创建临时目录用于存储)

4) 启动Web Monitor Service(任务管理平台)

5) 启动JobManager服务,用以管理TaskManager进程

注意事项:

1) 步骤二中用于存储Jar & Conf以及launcher脚本的地址为:/data/hadoop/yarn/local/usercache/{user}/appcache/application_{applicationId}/container_{applicationId}_…,其中包含一下内容 :

    - launch_container.sh

    - 启动命令 & 环境变量

    - flink-conf.yaml & log配置文件 – 启动配置 & 日志配置

    - flink.jar & lib – 运行依赖Jar

2) 步骤三中运行YarnSessionClusterEntrypoint,以此来启动JobManager,而后的TaskManager,则有JobManager来启动并管理

    - 实际上,在on Yarn模式下,TaskManager的启动 是推迟到了Filnk Job的调度发起的时候,并且,当一段时间没有接收到Job时,TaskManager将自动退出,释放资源 

3.AM 启动完成以后,就会向RM申请container去启动TaskManager,启动的过程中也是首先从HDFS上去下载一些包含TaskManager(yarn模式的话这里就是YarnTaskManager )主类 的jar和启动过程依赖的配置文件,如JobManager地址所在的文件,然后利用java cp的方式去启动YarnTaskManager ,一旦这些准备好,就可以接受任务了。这个和spark on yarn的yarn cluster模式其实差不多,也是分为两个部分,一个是准备工人和工具(spark是启动sc的过程,flink是初始化ENV的过程),另外一个就是给工人分配具体工作(都是执行具体的操作,action什么的触发)。

启动命令:

进程信息

** FlinkYarnSessionCli **

/home/hadoop/ym/jdk1.8.0_101/bin/java -Xmx512m -classpath /home/hadoop/ym/flink-1.1.3/lib/flink-dist_2.10-1.1.3.jar:/home/hadoop/ym/flink-1.1.3/lib/flink-python_2.10-1.1.3.jar:/home/hadoop/ym/flink-1.1.3/lib/log4j-1.2.17.jar:/home/hadoop/ym/flink-1.1.3/lib/slf4j-log4j12-1.7.7.jar::/home/hadoop/ym/hadoop-2.7.1/etc/hadoop: -Dlog.file=/home/hadoop/ym/flink-1.1.3/log/flink-xxxuser-yarn-session-db-180.photo.163.org.log -Dlog4j.configuration=file:/home/hadoop/ym/flink-1.1.3/conf/log4j-yarn-session.properties -Dlogback.configurationFile=file:/home/hadoop/ym/flink-1.1.3/conf/logback-yarn.xml org.apache.flink.yarn.cli.FlinkYarnSessionCli -j /home/hadoop/ym/flink-1.1.3/lib/flink-dist_2.10-1.1.3.jar -n 3 -jm 1024 -nm 1024 -st

** YarnApplicationMasterRunner **

/home/hadoop/ym/jdk1.8.0_101/bin/java -Xmx424M -Dlog.file=/home/hadoop/ym/hadoop-2.7.1/hadoop/nm/application_1480493133223_0009/container_1480493133223_0009_01_000001/jobmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnApplicationMasterRunner

**个YarnTaskManager **

/home/hadoop/ym/jdk1.8.0_101/bin/java -Xms424m -Xmx424m -XX:MaxDirectMemorySize=424m -Dlog.file=/home/hadoop/ym/hadoop-2.7.1/hadoop/nm/application_1480493133223_0009/container_1480493133223_0009_01_000003/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskManager --configDir .

其他说明:

日志管理

    FlinkYarnSessionCli的启动,由Client发起,归属于Flink管理,所以日志内容存储在Flink安装目录的log/

    YarnSessionClusterEntrypoint的启动,又Yarn发起,归属于Yarn管理,所以日志内容存储在Yarn管理的目录/data/hadoop/yarn/log/…

进程管理

    FlinkYarnSessionCli进程由Flink管理,YarnSessionClusterEntrypoint进程由Yarn管理

    当不通过FlinkYarnSessionCli来stop YarnSessionClusterEntrypoint时,需要使用yarn application -kill …,但是这种方式无法清理由FlinkYarnSessionCli管理和控制的资源,如:/tmp/.yarn-properties-{user}

    发起yarn application -kill …命令,请求停止Cluster时,会先停止TaskManager,然后停止JobManager,但是不会清理HDFS上的缓存

    通过FlinkYarnSessionCli的interact模式,可以对*/tmp/.yarn-properties-{user}* & HDFS缓存统一进行清理

Job提交

    这种模式下,Client将从本地查找/tmp/.yarn-properties-{user}配置,以获取applicationId来定位Cluster,所以Job提交最好是在FlinkYarnSessionCli的启动节点,否则需要指定applicationId

集群安装

    on Yarn模式下,Flink只需要安装至 一个节点,因为后续的进程,都会从HDFS上获取Jar & Conf来进行启动


官网文档

The YARN client needs to access the Hadoop configuration to connect to the YARN resource manager and to HDFS. It determines the Hadoop configuration using the following strategy:

Test if YARN_CONF_DIR, HADOOP_CONF_DIR or HADOOP_CONF_PATH are set (in that order). If one of these variables are set, they are used to read the configuration.

If the above strategy fails (this should not be the case in a correct YARN setup), the client is using the HADOOP_HOME environment variable. If it is set, the client tries to access $HADOOP_HOME/etc/hadoop (Hadoop 2) and $HADOOP_HOME/conf (Hadoop 1).

When starting a new Flink YARN session, the client first checks if the requested resources (containers and memory) are available. After that, it uploads a jar that contains Flink and the configuration to HDFS (step 1).

The next step of the client is to request (step 2) a YARN container to start the ApplicationMaster (step 3). Since the client registered the configuration and jar-file as a resource for the container, the NodeManager of YARN running on that particular machine will take care of preparing the container (e.g. downloading the files). Once that has finished, the ApplicationMaster (AM) is started.

The JobManager and AM are running in the same container. Once they successfully started, the AM knows the address of the JobManager (its own host). It is generating a new Flink configuration file for the TaskManagers (so that they can connect to the JobManager). The file is also uploaded to HDFS. Additionally, the AM container is also serving Flink’s web interface. All ports the YARN code is allocating are ephemeral ports. This allows users to execute multiple Flink YARN sessions in parallel.

After that, the AM starts allocating the containers for Flink’s TaskManagers, which will download the jar file and the modified configuration from the HDFS. Once these steps are completed, Flink is set up and ready to accept Jobs. 

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容