flink on yarn模式

flink的任务也可以运行在yarn上面,将flnk的任务提交到yarn平台,通过yarn平台来实现我们的任务统一的资源调度管理,方便我们管理集群当中的CPU和内存等资源

依赖环境说明:

至少hadoop2.2版本及以上

hdfs以及yarn服务正常启动

flink on yarn又分为两种模式:

<v:shapetype id="_x0000_t75" stroked="f" filled="f" path="m@4@5l@4@11@9@11@9@5xe" o:preferrelative="t" o:spt="75" coordsize="21600,21600"><v:stroke joinstyle="miter"><v:formulas></v:formulas><v:path o:connecttype="rect" gradientshapeok="t" o:extrusionok="f"></v:path></v:stroke></v:shapetype><v:shape id="图片_x0020_7" style="width:415.2pt;height:268.8pt; visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="说明: C:\Users\admin\Desktop\图片1.png" o:spid="_x0000_i1025"><v:imagedata o:title="图片1" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image001.png"></v:imagedata></v:shape>

1、第一种模式:单个yarn session模式

这种方式需要先启动集群,然后在提交作业,接着会向yarn申请一块资源空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,那下一个作业才会正常提交,实际工作当中一般不会使用这种模式

这种模式,不需要做任何配置,直接将任务提价到yarn集群上面去,我们需要提前启动hdfs以及yarn集群即可

启动单个Yarn Session模式

第一步:修改yarn-site.xml配置为文件

node01执行以下命令修改yarn-site.xml,添加以下配置属性

cd /kkb/install/hadoop-2.6.0-cdh5.14.2/etc/hadoop

vim yarn-site.xml

<property>

<name>yarn.resourcemanager.am.max-attempts</name>

<value>4</value>

<description>

The maximum number of application master execution attempts.

</description>

</property>

然后将修改后的配置文件拷贝到node02与node03服务器

node01执行以下命令进行拷贝配置文件

cd /kkb/install/hadoop-2.6.0-cdh5.14.2/etc/hadoop

scp yarn-site.xml node02:$PWD

scp yarn-site.xml node03:$PWD

然后重新启动yarn集群即可

第二步:修改flink配置文件

node01执行以下命令更改flink配置文件

cd /kkb/install/flink-1.8.1/conf

vim flink-conf.yaml

high-availability: zookeeper

high-availability.storageDir: hdfs://node01:8020/flink_yarn_ha

high-availability.zookeeper.path.root: /flink-yarn

high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181

yarn.application-attempts: 10

hdfs上面创建文件夹

node01执行以下命令创建hdfs文件夹

hdfs dfs -mkdir -p /flink_yarn_ha

第三步:在yarn当中启动flink集群

直接在node01执行以下命令,在yarn当中启动一个全新的flink集群,可以直接使用yarn-session.sh这个脚本来进行启动

cd /kkb/install/flink-1.8.1/

bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]

我们也可以使用 --help 来查看更多参数设置

bin/yarn-session.sh –help

Usage:

Required

-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)

Optional

-D <property=value> use value for given property

-d,--detached If present, runs the job in detached mode

-h,--help Help for the Yarn session CLI.

-id,--applicationId <arg> Attach to running YARN session

-j,--jar <arg> Path to Flink jar file

-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)

-m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.

-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)

-nl,--nodeLabel <arg> Specify YARN node label for the YARN application

-nm,--name <arg> Set a custom name for the application on YARN

-q,--query Display available YARN resources (memory, cores)

-qu,--queue <arg> Specify YARN queue.

-s,--slots <arg> Number of slots per TaskManager

-sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such

as typing Ctrl + C.

-st,--streaming Start Flink in streaming mode

-t,--ship <arg> Ship files in the specified directory (t for transfer)

-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)

-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)

-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode

注意:如果在启动的时候,yarn的内存太小,可能会报以下错误

Diagnostics: Container [] is running beyond virtual memory limits. Current usage: 250.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing containerpid=6386,containerID=container_1521277661809_0006_01_000001

我们需要修改yarn-site.xml添加以下配置,然后重启yarn即可

<property>

<name>yarn.nodemanager.vmem-check-enabled</name>

<value>false</value>

</property>

第二步:查看yarn管理界面8088

访问yarn的8088管理界面,发现yarn当中有一个应用

http://node01:8088/cluster

yarn当中会存在一个常驻的application,就是为我们flink单独启动的一个session

第三步:提交任务

使用flink自带的jar包,实现单词计数统计功能

node01准备文件并上传hdfs

cd /kkb

vim wordcount.txt

内容如下

hello world

flink hadoop

hive spark

hdfs上面创建文件夹并上传文件

hdfs dfs -mkdir -p /flink_input

hdfs dfs -put wordcount.txt /flink_input

node01执行以下命令,提交任务到flink集群

cd /kkb/install/flink-1.8.1

bin/flink run ./examples/batch/WordCount.jar -input hdfs://node01:8020/flink_input -output hdfs://node01:8020/flink_output/wordcount-result.txt

第四步:验证Yarn Session的高可用

通过node01:8088这个界面,查看yarn session启动在哪一台机器上,然后杀死yarn session进程,我们会发现yarn session会重新启动在另外一台机器上面

找到YarnSessionClusterEntrypoint所在的服务器,然后杀死该进程

[hadoop@node02 ~]$ jps

10065 QuorumPeerMain

10547 YarnSessionClusterEntrypoint

10134 DataNode

10234 NodeManager

10652 Jps

[hadoop@node02 ~]$ kill -9 10547

杀死YarnSessionClusterEntrypoint进程之后,会发现,yarn集群会重新启动一个YarnSessionClusterEntrypoint进程在其他机器上面

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容