flink的任务也可以运行在yarn上面,将flnk的任务提交到yarn平台,通过yarn平台来实现我们的任务统一的资源调度管理,方便我们管理集群当中的CPU和内存等资源
依赖环境说明:
至少hadoop2.2版本及以上
hdfs以及yarn服务正常启动
flink on yarn又分为两种模式:
<v:shapetype id="_x0000_t75" stroked="f" filled="f" path="m@4@5l@4@11@9@11@9@5xe" o:preferrelative="t" o:spt="75" coordsize="21600,21600"><v:stroke joinstyle="miter"><v:formulas></v:formulas><v:path o:connecttype="rect" gradientshapeok="t" o:extrusionok="f"></v:path></v:stroke></v:shapetype><v:shape id="图片_x0020_7" style="width:415.2pt;height:268.8pt; visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="说明: C:\Users\admin\Desktop\图片1.png" o:spid="_x0000_i1025"><v:imagedata o:title="图片1" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image001.png"></v:imagedata></v:shape>
1、第一种模式:单个yarn session模式
这种方式需要先启动集群,然后在提交作业,接着会向yarn申请一块资源空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,那下一个作业才会正常提交,实际工作当中一般不会使用这种模式
这种模式,不需要做任何配置,直接将任务提价到yarn集群上面去,我们需要提前启动hdfs以及yarn集群即可
启动单个Yarn Session模式
第一步:修改yarn-site.xml配置为文件
node01执行以下命令修改yarn-site.xml,添加以下配置属性
cd /kkb/install/hadoop-2.6.0-cdh5.14.2/etc/hadoop
vim yarn-site.xml
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
然后将修改后的配置文件拷贝到node02与node03服务器
node01执行以下命令进行拷贝配置文件
cd /kkb/install/hadoop-2.6.0-cdh5.14.2/etc/hadoop
scp yarn-site.xml node02:$PWD
scp yarn-site.xml node03:$PWD
然后重新启动yarn集群即可
第二步:修改flink配置文件
node01执行以下命令更改flink配置文件
cd /kkb/install/flink-1.8.1/conf
vim flink-conf.yaml
high-availability: zookeeper
high-availability.storageDir: hdfs://node01:8020/flink_yarn_ha
high-availability.zookeeper.path.root: /flink-yarn
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181
yarn.application-attempts: 10
hdfs上面创建文件夹
node01执行以下命令创建hdfs文件夹
hdfs dfs -mkdir -p /flink_yarn_ha
第三步:在yarn当中启动flink集群
直接在node01执行以下命令,在yarn当中启动一个全新的flink集群,可以直接使用yarn-session.sh这个脚本来进行启动
cd /kkb/install/flink-1.8.1/
bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]
我们也可以使用 --help 来查看更多参数设置
bin/yarn-session.sh –help
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <property=value> use value for given property
-d,--detached If present, runs the job in detached mode
-h,--help Help for the Yarn session CLI.
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
-nl,--nodeLabel <arg> Specify YARN node label for the YARN application
-nm,--name <arg> Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
as typing Ctrl + C.
-st,--streaming Start Flink in streaming mode
-t,--ship <arg> Ship files in the specified directory (t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
注意:如果在启动的时候,yarn的内存太小,可能会报以下错误
Diagnostics: Container [] is running beyond virtual memory limits. Current usage: 250.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing containerpid=6386,containerID=container_1521277661809_0006_01_000001
我们需要修改yarn-site.xml添加以下配置,然后重启yarn即可
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
第二步:查看yarn管理界面8088
访问yarn的8088管理界面,发现yarn当中有一个应用
yarn当中会存在一个常驻的application,就是为我们flink单独启动的一个session
第三步:提交任务
使用flink自带的jar包,实现单词计数统计功能
node01准备文件并上传hdfs
cd /kkb
vim wordcount.txt
内容如下
hello world
flink hadoop
hive spark
hdfs上面创建文件夹并上传文件
hdfs dfs -mkdir -p /flink_input
hdfs dfs -put wordcount.txt /flink_input
node01执行以下命令,提交任务到flink集群
cd /kkb/install/flink-1.8.1
bin/flink run ./examples/batch/WordCount.jar -input hdfs://node01:8020/flink_input -output hdfs://node01:8020/flink_output/wordcount-result.txt
第四步:验证Yarn Session的高可用
通过node01:8088这个界面,查看yarn session启动在哪一台机器上,然后杀死yarn session进程,我们会发现yarn session会重新启动在另外一台机器上面
找到YarnSessionClusterEntrypoint所在的服务器,然后杀死该进程
[hadoop@node02 ~]$ jps
10065 QuorumPeerMain
10547 YarnSessionClusterEntrypoint
10134 DataNode
10234 NodeManager
10652 Jps
[hadoop@node02 ~]$ kill -9 10547
杀死YarnSessionClusterEntrypoint进程之后,会发现,yarn集群会重新启动一个YarnSessionClusterEntrypoint进程在其他机器上面