Flink部署及作业提交(On YARN)

Hadoop环境快速搭建

官方文档:

在上一篇 Flink部署及作业提交(On Flink Cluster) 文章中,我们介绍了如何编译部署Flink自身的资源分配和管理系统,并将作业提交到该系统上去运行。但通常来讲这种方式用得不多,因为在企业中,可能会使用不同的分布式计算框架,如Spark、Storm或MapReduce等。

如果每一种框架都需要搭建各自的资源分配和管理系统,就无法共享资源,导致资源利用率低。并且大多企业一般会使用Hadoop生态的相关组件做作为大数据处理平台的底座,如HDFS、Hive、YARN等。

其中 YARN 是资源调度框架、通用的资源管理系统,可以为上层应用提供统一的资源管理和调度,Spark、Flink、Storm等计算框架都可以集成到 YARN 上。如此一来这些计算框架可以享受整体的资源调度,进而提高集群资源的利用率,这也就是所谓的 xxx on YARN。因此,绝大部分企业都是将计算作业放到 YARN 上进行调度,而不是每种计算框架都单独搭一个资源分配和管理系统。这也是为什么要单独介绍Flink On YARN的原因。

想要让Flink作业跑在 YARN 上,我们首先得搭建一个Hadoop环境,为了简单这里只搭建单节点环境。我这里使用的是CDH的Hadoop发行版。下载地址如下:

首先需要安装好Java运行环境,由于比较简单这里就不演示了:

[root@hadoop01 ~]# echo ${JAVA_HOME}
/usr/local/jdk/11
[root@hadoop01 ~]# java -version
java version "11.0.8" 2020-07-14 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
[root@hadoop01 ~]# 

配置hosts,将主机名与本地ip建立一个映射关系:

[root@hadoop01 ~]# vim /etc/hosts
192.168.243.142   hadoop01

关闭防火墙:

[root@hadoop01 ~]# systemctl stop firewalld && systemctl disable firewalld

配置免密登录:

[root@hadoop01 ~]# ssh-keygen -t rsa      # 生成密钥对
[root@hadoop01 ~]# ssh-copy-id hadoop01    # 拷贝公钥并追加到自己的授权列表文件中

然后就可以开始安装Hadoop了,这里采用 hadoop-2.6.0-cdh5.16.2 版本作为演示,复制下载链接到系统上进行下载:

[root@hadoop01 ~]# cd /usr/local/src
[root@hadoop01 /usr/local/src]# wget http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.16.2.tar.gz

解压下载好的压缩包:

[root@hadoop01 /usr/local/src]# tar -zxvf hadoop-2.6.0-cdh5.16.2.tar.gz -C /usr/local

配置系统环境变量:

[root@hadoop01 ~]# vim /etc/profile
export HADOOP_HOME=/usr/local/hadoop-2.6.0-cdh5.16.2
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
[root@hadoop01 ~]# source /etc/profile

修改几个配置文件:

[root@hadoop01 ~]# cd $HADOOP_HOME
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/hadoop-env.sh
export JAVA_HOME=/usr/local/jdk/11  # 配置JDK的目录

# 配置 core
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/core-site.xml
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://hadoop01:8020</value>
  </property>
</configuration>

# 配置 hdfs,设置副本因子和临时目录
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/hdfs-site.xml
<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/data/hadoop/tmp</value>
  </property>
</configuration>

# 配置slave节点的ip或hostname
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/slaves
hadoop01

# 配置 yarn
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/yarn-site.xml
<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

# 配置MapReduce
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# cp etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xml
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/mapred-site.xml
<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
</configuration>

# 创建hadoop的临时目录
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# mkdir -p /data/hadoop/tmp

应用HDFS的配置:

[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# ./bin/hdfs namenode -format

启动所有组件:

[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# ./sbin/start-all.sh

启动成功后查看进程:

[root@hadoop01 ~]# jps
3344 SecondaryNameNode
2722 NameNode
3812 Jps
3176 DataNode
3578 NodeManager
3502 ResourceManager
[root@hadoop01 ~]# 

然后在浏览器中访问HDFS的web界面,默认端口是50070:


image.png

接着访问HDFS的YARN界面,默认端口是8088:


image.png

测试HDFS能否正常读写:

[root@hadoop01 ~]# hadoop fs -put anaconda-ks.cfg /  # 任意put一个文件到hdfs
[root@hadoop01 ~]# hadoop fs -ls /    # 查看hdfs中是否有该文件
Found 1 items  
-rw-r--r--   1 root supergroup       1269 2020-09-29 17:45 /anaconda-ks.cfg

经过测试,确认Hadoop环境是运行正常之后,我们就可以尝试将Flink应用放到YARN上运行了。


Flink on YARN两种方式

Flink on YARN 有两种模式:Session模式和Per-Job模式。在Session模式中多个 JobManager 共享 Dispatcher 和 YarnResourceManager。在这种模式下,需要先向 YARN 申请资源,初始化一个常驻服务在 YARN 上,后续提交的Job都将运行在这个Session上:


image.png

而Per-Job模式则相反,一个 JobManager 独享 Dispatcher 和 YarnResourceManager。也就是说每提交一个Job都新建一个Session,不同Job之间的资源是隔离的,不会互相影响:


image.png

想要深入了解的话可以参考官方文档:


Flink on YARN Session模式实操

首先将在 Flink部署及作业提交(On Flink Cluster) 一文中编译好的Flink目录拷贝到当前部署了Hadoop环境的机器上:

[root@hadoop01 ~]# scp -r 192.168.243.148:/usr/local/src/flink-release-1.11.2/flink-dist/target/flink-1.11.2-bin/flink-1.11.2/ /usr/local/flink

配置环境变量,否则Flink会报找不到Hadoop相关Class的异常:

[root@hadoop01 ~]# vim /etc/profile
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop

export HADOOP_CLASSPATH=$HADOOP_COMMON_HOME/lib:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/mepreduce/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/tools/*:$HADOOP_HOME/share/hadoop/httpfs/*:$HADOOP_HOME/share/hadoop/kms/*:$HADOOP_HOME/share/hadoop/common/lib/*

然后执行./bin/yarn-session.sh --help命令测试一下能否正常输出帮助信息:

[root@hadoop01 ~]# cd /usr/local/flink/
[root@hadoop01 /usr/local/flink]# ./bin/yarn-session.sh --help
...
Usage:
   Optional
     -at,--applicationType <arg>     Set a custom application type for the application on YARN
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode
[root@hadoop01 /usr/local/flink]# 
  • 如果没配环境变量的话,执行这条命令就会报找不到类的错误

确认Flink可以正常找到Hadoop后,使用如下命令在 YARN 上创建一个常驻服务:

[root@hadoop01 /usr/local/flink]# ./bin/yarn-session.sh -jm 1024m -tm 2048m
...

JobManager Web Interface: http://hadoop01:37525    # 创建成功的话会输出JobManager的web访问地址
  • -jm:指定JobManager需要的内存资源
  • -tm:指定TaskManager需要的内存资源

使用浏览器打开 YARN 的web界面,正常情况下会有如下应用:


image.png

点击应用右边的 “ApplicationMaster” 可以跳转到Flink的dashboard。此时可以看到Flink Dashboard页面上任何数字都是0,应该就能看得出实际这只是启动了一个JobManager:


image.png
  • Tips:要想页面能够正常跳转,还得在浏览器所在主机的hosts文件中配置一下hadoop01这个主机名到IP的映射关系

接下来我们尝试一下提交作业到 YARN 上运行,首先准备好官方提供的测试文件,并put到HDFS中:

[root@hadoop01 ~]# wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
[root@hadoop01 ~]# hadoop fs -copyFromLocal LICENSE-2.0.txt /

然后执行如下命令,提交一个Word Count作业:

[root@hadoop01 ~]# cd /usr/local/flink/
[root@hadoop01 /usr/local/flink]# ./bin/flink run -m hadoop01:37525 ./examples/batch/WordCount.jar \
       --input hdfs://hadoop01:8020/LICENSE-2.0.txt --output hdfs://hadoop01:8020/wordcount-result.txt
  • Tips:这里的hadoop01:37525,是执行完yarn-session.sh命令输出的JobManager的访问地址

执行完成后,控制台会输出如下内容:

Job has been submitted with JobID 2240e11994cf8579a78e16a1984f08db
Program execution finished
Job with JobID 2240e11994cf8579a78e16a1984f08db has finished.
Job Runtime: 10376 ms

此时到“Completed Jobs”页面中,可以看到运行完成的作业及其信息:


image.png

除此之外,我们还可以查看该作业输出到HDFS中的结果文件:

[root@hadoop01 /usr/local/flink]# hadoop fs -ls /wordcount-result.txt
-rw-r--r--   1 root supergroup       4499 2020-09-29 20:25 /wordcount-result.txt
[root@hadoop01 /usr/local/flink]# hadoop fs -text /wordcount-result.txt

Flink on YARN Per-Job模式实操

首先将之前在 yarn 上运行的应用和相关进程给kill掉:

[root@hadoop01 ~]# yarn application -kill application_1601372571363_0001
[root@hadoop01 ~]# jps
6995 SecondaryNameNode
7204 ResourceManager
7305 NodeManager
11291 Jps
6734 NameNode
6830 DataNode
8942 FlinkYarnSessionCli
[root@hadoop01 ~]# kill 8942

Per-Job模式更简单,因为是提交一个作业就创建一次资源的,所以直接运行如下命令就可以提交一个Flink的Word Count作业到 yarn 上,不需要像Session模式那样事先去创建资源:

[root@hadoop01 /usr/local/flink]# ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

作业运行完成后,控制台会输出一堆统计结果。此时在 yarn 上可以看到该作业已经执行完成:


image.png

Flink Scala Shell的简单使用

在之前的演示中可以看到,提交的Flink作业都是以jar包形式存在的。如果我们在实际开发中,需要频繁修改代码提交到 yarn 上测试,那么就得频繁的打包,相对来说就有点麻烦。那么Flink有没有像Spark那样提供类似于 Spark Shell 的交互式编程终端用于简单的代码测试呢?答案是有的,Flink提供了PyFlink Shell和Scala Shell,可以执行Python和Scala代码。

这里简单演示下Flink Scala Shell的使用,执行如下命令打开Flink Scala Shell:

[root@hadoop01 /usr/local/flink]# ./bin/start-scala-shell.sh local
  • 这里的local表示在本地运行,除此之外还可以选择remoteyarn,具体可以使用--help参数进行查看

shell里调用API的方式还是一样的,只是环境变成了内置的变量,例如这里使用的benv就表示批处理的env:

scala> val dataSet = benv.readTextFile("file:///root/LICENSE-2.0.txt")
dataSet: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@3110bb19

scala> dataSet.print
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342