Flink提供了多种部署方式,本文主要介绍local cluster、standalone cluser、yarn 3种常用的模式。
环境准备
- Java8或者11
- Zookeeper
- Hadoop
- flink安装包 flink-1.10.1-bin-scala_2.11.tgz
Local Cluster
Local Cluster(本地集群模式)会在本地启动一个JVM,主要用于调试代码。
前置条件
# java 版本
java -version
# output
openjdk version "1.8.0_212"
OpenJDK Runtime Environment (build 1.8.0_212-8u212-b03-0ubuntu1.18.04.1-b03)
OpenJDK 64-Bit Server VM (build 25.212-b03, mixed mode)
部署
# 解压
tar -zxvf flink-1.10.1-bin-scala_2.11.tgz
cd flink-1.10.1
# 启动
./bin/start-cluster.sh
浏览器输入http://localhost:8081,查看WEBUI监控界面。
# 日志
ls -l ${FLINK_HOME}/log
# output
-rw-rw-rw- 1 zfylin zfylin 11464 Aug 13 23:13 flink-zfylin-standalonesession-0-z.log
-rw-rw-rw- 1 zfylin zfylin 11977 Aug 13 23:13 flink-zfylin-standalonesession-0-z.log.1
-rw-rw-rw- 1 zfylin zfylin 0 Aug 13 23:13 flink-zfylin-standalonesession-0-z.out
-rw-rw-rw- 1 zfylin zfylin 0 Aug 13 23:01 flink-zfylin-standalonesession-0-z.out.1
-rw-rw-rw- 1 zfylin zfylin 16136 Aug 13 23:13 flink-zfylin-taskexecutor-0-z.log
-rw-rw-rw- 1 zfylin zfylin 17255 Aug 13 23:13 flink-zfylin-taskexecutor-0-z.log.1
-rw-rw-rw- 1 zfylin zfylin 0 Aug 13 23:13 flink-zfylin-taskexecutor-0-z.out
-rw-rw-rw- 1 zfylin zfylin 0 Aug 13 23:01 flink-zfylin-taskexecutor-0-z.out.1
# 停止
./bin/stop-cluster.sh
Standalone Cluster
前置条件
- java8或者java11
- 支持ssh免密登录的3台节点, hadoop01、hadoop02、hadoop03
部署
1. 配置
cd ${FLINK_HOME}/conf
vim flink-conf.yaml
# 修改
jobmanager.rpc.address: hadoop01
vim slaves
# 修改
hadoop02
hadoop03
配置说明:
节点 | 角色 |
---|---|
hadoop01 | jobmanager |
hadoop02 | taskmanager |
hadoop03 | taskmanager |
最后配置好的flink包从hadoop01分发到hadoop02、hadoop03
scp -r ${FLINK_HOME} hadoop01:${FLINK_HOME}
scp -r ${FLINK_HOME} hadoop02:${FLINK_HOME}
2. 启动
cd ${FLINK_HOME}
./bin/start-cluster.sh
浏览器输入http://hadoop01:8081,查看WEBUI监控界面。
3. 日志
日志都在 ${FLINK_HOME}/log
4. 停止
cd ${FLINK_HOME}
./bin/stop-cluster.sh
YARN
前置条件
- 已经安装好Hadoop 集群(版本2.4.1以上)
- Flink集成Hadoop
Hadoop 集成
Hadoop集成可以通过以下两种方式:
-
设置环境变量来引用Hadoop配置
HADOOP_CONF_DIR
。HADOOP_CONF_DIR=/path/to/etc/hadoop
-
添加hadoop相关类
-
export HADOOP_CLASSPATH=`hadoop classpath`
-
部署
Flink 支持基于YARN模式部署。YARN模式部署方式有两种
- yarn-session
- yarn-per-job
Flink on yarn 交互流程
YRAN客户端需要访问Hadoop的相关配置文件,从而可以连接YARN资源管理器和HDFS。它使用下面的规则来决定Hadoop配置:
- 判断YARN_CONF_DIR,HADOOP_CONF_DIR或HADOOP_CONF_PATH等环境变量是否设置了(按照这些变量的顺序判断)。如果它们中有一个被设置了,那么就会读取其中的配置。
- 如果上面的规则失败了(如果正确安装了 YARN 的话,这不应该会发生),那么客户端将会使用HADOOP_HOME环境变量。如果这个环境变量设置了,客户端将会尝试访问
$HADOOP_HOME/etc/hadoop(Hadoop 2)
或者$HADOOP_HOME/conf (Hadoop 1)
路径下文件。
- 当启动一个新的flink yarn session,客户端首先判断所请求的资源(容器和内存)是否可用。在那之后,它会把包含了Flink以及相关配置的jar包上传到HDFS。
- 客户端的下一步是请求一个 YARN 容器来启动ApplicationMaster
- 因为客户端已经将配置和jar文件作为资源向容器注册了,所以运行在特定机器上的NodeManager 会负责准备容器(例如,下载一些文件),一旦上面的步骤完成了,ApplicationMaster (AM)将会被启动。AM将负责为Flink提供WEB界面服务,Flink 用来提供服务的端口是由用户 + 应用程序 id 作为偏移配置决定的。这样的措施使得用户能够并行地执行多个 Flink YARN session。JobManager和AM是运行在同一个容器中的,一旦它们成功地启动了,AM就会知道JobManager 的地址(就是它自己的地址),它会为TaskManager 生成一个新的Flink配置文件(这样TaskManager才能连上 JobManager)。这些新的配置文件同样会被上传到HDFS上。
- AM开始为Flink的TaskManager 分配容器,这会从HDFS下载jar文件和修改过的配置文件。
- Flink服务就在YARN上启动完成并准备接受任务了。
Yarn-session
- 在yarn初始化一个flink集群,申请指定的资源,申请后的资源大小不变。这个flink集群会常驻在yarn集群,直到手动停止。
- Flink 任务都提交这个flink集群,共用这个集群的资源,如果集群资源使用完了,下一个flink任务就无法提交,需要等到其中一个任务执行完成释放了资源,下一个任务才能正常提交。
- 这个模式总的资源限制在session中,适合特定的运行环境以及测试环境。
启动集群
我们可以通过./bin/yarn-session.sh
脚本启动yarn-session。参数如下:
./bin/yarn-session.sh
Usage:
Optional
-at,--applicationType <arg> Set a custom application type for the application on YARN
-D <property=value> use value for given property
-d,--detached If present, runs the job in detached mode
-h,--help Help for the Yarn session CLI.
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-nl,--nodeLabel <arg> Specify YARN node label for the YARN application
-nm,--name <arg> Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-t,--ship <arg> Ship files in the specified directory (t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
在启动的是可以指定TaskManager的个数以及内存(默认是1G),也可以指定JobManager的内存,但是JobManager的个数只能是一个
# 启动4个taskmanager,每个taskmanager 8G,slot 8即每个taskmanger两个slot
./bin/yarn-session.sh -n 4 -tm 8192 -s 8
提交任务
使用 ./bin/flink 提交任务,参数如下:
./bin/flink
./flink <ACTION> [OPTIONS] [ARGUMENTS]
The following actions are available:
Action "run" compiles and runs a program.
Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action options:
-c,--class <classname> Class with the program entry point
("main()" method). Only needed if the
JAR file does not specify the class in
its manifest.
-C,--classpath <url> Adds a URL to each user code
classloader on all nodes in the
cluster. The paths must specify a
protocol (e.g. file://) and be
accessible on all nodes (e.g. by means
of a NFS share). You can use this
option multiple times for specifying
more than one URL. The protocol must
be supported by the {@link
java.net.URLClassLoader}.
-d,--detached If present, runs the job in detached
mode
-n,--allowNonRestoredState Allow to skip savepoint state that
cannot be restored. You need to allow
this if you removed an operator from
your program that was part of the
program when the savepoint was
triggered.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
……
可以使用run选项运行Flink任务。这个脚本可以自动获取到YARN session的地址,所以可以不指定--jobmanager参数。
我们以Flink自带的WordCount程序为例进行介绍,先将测试文件上传到HDFS上:
hadoop fs -copyFromLocal LICENSE hdfs:///tmp
./bin/flink run ./examples/batch/WordCount.jar --input hdfs:///tmp/LICENSE
# 输出结果
(0,9)
(1,6)
(10,3)
(12,1)
(15,1)
(17,1)
(2,9)
(2004,1)
(2010,2)
(2011,2)
(2012,5)
(2013,4)
(2014,6)
(2015,7)
(2016,2)
(3,6)
(4,4)
(5,3)
(50,1)
(6,3)
(7,3)
(8,2)
(9,2)
(a,25)
(above,4)
(acceptance,1)
(accepting,3)
(act,1)
Yarn-per-job
- 每次任务提交都会创建一个Flink集群,任务之间相互独立,互不影响,方便管理。
- 任务执行完成后,创建的Flink集群销毁,资源释放。
- 这种方式需要确保yarn集群资源足够,适合生产环境。
提交任务
和yarn-session模式一样,使用run选项运行Flink任务。
# -m yarn-cluster 表示是yarn-per-job模式
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar \
--input hdfs:///tmp/LICENSE \
--output hdfs:///tmp/result.txt
HA(高可用)
Flink on Yarn 的高可用配置只需要一个 JobManager。当 JobManager 发生失败时,Yarn 负责将其重新启动。
配置AM在尝试重启的最大次数(yarn-site.xml)
在$HADOOP_CONF_DIR
的yarn-site.xml添加如下配置:
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
以上配置是application master在重启时,尝试的最大次数是为4。
配置Application Attempts(flink-conf.yaml)
在$FLINK_HOME/conf
的flink-conf.yaml中添加如下配置:
yarn.application-attempts: 10
以上配置意味着如果flink任务启动失败,YARN 会再重试 9 次(9 次重试 + 1 次启动),如果 YARN 启动 10 次作业还失败,则 YARN 才会将该任务的状态置为失败。如果发生进程抢占,节点硬件故障或重启,NodeManager 重新同步等,YARN 会继续尝试启动应用。 这些重启不计入 yarn.application-attempts 个数中。