Flink入门-部署

Flink提供了多种部署方式,本文主要介绍local cluster、standalone cluser、yarn 3种常用的模式。

环境准备

Local Cluster

Local Cluster(本地集群模式)会在本地启动一个JVM,主要用于调试代码。

前置条件

# java 版本
java -version
# output
openjdk version "1.8.0_212"
OpenJDK Runtime Environment (build 1.8.0_212-8u212-b03-0ubuntu1.18.04.1-b03)
OpenJDK 64-Bit Server VM (build 25.212-b03, mixed mode)

部署

# 解压
tar -zxvf flink-1.10.1-bin-scala_2.11.tgz
cd flink-1.10.1
# 启动
./bin/start-cluster.sh

浏览器输入http://localhost:8081,查看WEBUI监控界面。

# 日志
ls -l ${FLINK_HOME}/log
# output
-rw-rw-rw- 1 zfylin zfylin 11464 Aug 13 23:13 flink-zfylin-standalonesession-0-z.log
-rw-rw-rw- 1 zfylin zfylin 11977 Aug 13 23:13 flink-zfylin-standalonesession-0-z.log.1
-rw-rw-rw- 1 zfylin zfylin     0 Aug 13 23:13 flink-zfylin-standalonesession-0-z.out
-rw-rw-rw- 1 zfylin zfylin     0 Aug 13 23:01 flink-zfylin-standalonesession-0-z.out.1
-rw-rw-rw- 1 zfylin zfylin 16136 Aug 13 23:13 flink-zfylin-taskexecutor-0-z.log
-rw-rw-rw- 1 zfylin zfylin 17255 Aug 13 23:13 flink-zfylin-taskexecutor-0-z.log.1
-rw-rw-rw- 1 zfylin zfylin     0 Aug 13 23:13 flink-zfylin-taskexecutor-0-z.out
-rw-rw-rw- 1 zfylin zfylin     0 Aug 13 23:01 flink-zfylin-taskexecutor-0-z.out.1
# 停止
./bin/stop-cluster.sh

Standalone Cluster

前置条件

  • java8或者java11
  • 支持ssh免密登录的3台节点, hadoop01、hadoop02、hadoop03

部署

1. 配置

cd ${FLINK_HOME}/conf
vim flink-conf.yaml 

# 修改
jobmanager.rpc.address: hadoop01
vim slaves

# 修改
hadoop02
hadoop03

配置说明:

节点 角色
hadoop01 jobmanager
hadoop02 taskmanager
hadoop03 taskmanager

最后配置好的flink包从hadoop01分发到hadoop02、hadoop03

scp -r ${FLINK_HOME} hadoop01:${FLINK_HOME}
scp -r ${FLINK_HOME} hadoop02:${FLINK_HOME}

2. 启动

cd ${FLINK_HOME}
./bin/start-cluster.sh

浏览器输入http://hadoop01:8081,查看WEBUI监控界面。

3. 日志

日志都在 ${FLINK_HOME}/log

4. 停止

cd ${FLINK_HOME}
./bin/stop-cluster.sh

YARN

前置条件

  • 已经安装好Hadoop 集群(版本2.4.1以上)
  • Flink集成Hadoop

Hadoop 集成

Hadoop集成可以通过以下两种方式:

部署

Flink 支持基于YARN模式部署。YARN模式部署方式有两种

  • yarn-session
  • yarn-per-job

Flink on yarn 交互流程

1.png

YRAN客户端需要访问Hadoop的相关配置文件,从而可以连接YARN资源管理器和HDFS。它使用下面的规则来决定Hadoop配置:

  • 判断YARN_CONF_DIR,HADOOP_CONF_DIR或HADOOP_CONF_PATH等环境变量是否设置了(按照这些变量的顺序判断)。如果它们中有一个被设置了,那么就会读取其中的配置。
  • 如果上面的规则失败了(如果正确安装了 YARN 的话,这不应该会发生),那么客户端将会使用HADOOP_HOME环境变量。如果这个环境变量设置了,客户端将会尝试访问$HADOOP_HOME/etc/hadoop(Hadoop 2)或者$HADOOP_HOME/conf (Hadoop 1)路径下文件。
  1. 当启动一个新的flink yarn session,客户端首先判断所请求的资源(容器和内存)是否可用。在那之后,它会把包含了Flink以及相关配置的jar包上传到HDFS。
  2. 客户端的下一步是请求一个 YARN 容器来启动ApplicationMaster
  3. 因为客户端已经将配置和jar文件作为资源向容器注册了,所以运行在特定机器上的NodeManager 会负责准备容器(例如,下载一些文件),一旦上面的步骤完成了,ApplicationMaster (AM)将会被启动。AM将负责为Flink提供WEB界面服务,Flink 用来提供服务的端口是由用户 + 应用程序 id 作为偏移配置决定的。这样的措施使得用户能够并行地执行多个 Flink YARN session。JobManager和AM是运行在同一个容器中的,一旦它们成功地启动了,AM就会知道JobManager 的地址(就是它自己的地址),它会为TaskManager 生成一个新的Flink配置文件(这样TaskManager才能连上 JobManager)。这些新的配置文件同样会被上传到HDFS上。
  4. AM开始为Flink的TaskManager 分配容器,这会从HDFS下载jar文件和修改过的配置文件。
  5. Flink服务就在YARN上启动完成并准备接受任务了。

Yarn-session

image-20200817171623439.png
  1. 在yarn初始化一个flink集群,申请指定的资源,申请后的资源大小不变。这个flink集群会常驻在yarn集群,直到手动停止。
  2. Flink 任务都提交这个flink集群,共用这个集群的资源,如果集群资源使用完了,下一个flink任务就无法提交,需要等到其中一个任务执行完成释放了资源,下一个任务才能正常提交。
  3. 这个模式总的资源限制在session中,适合特定的运行环境以及测试环境。
启动集群

我们可以通过./bin/yarn-session.sh脚本启动yarn-session。参数如下:

./bin/yarn-session.sh
Usage:
   Optional
     -at,--applicationType <arg>     Set a custom application type for the application on YARN
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

在启动的是可以指定TaskManager的个数以及内存(默认是1G),也可以指定JobManager的内存,但是JobManager的个数只能是一个

# 启动4个taskmanager,每个taskmanager 8G,slot 8即每个taskmanger两个slot
./bin/yarn-session.sh -n 4 -tm 8192 -s 8
提交任务

使用 ./bin/flink 提交任务,参数如下:

./bin/flink
./flink <ACTION> [OPTIONS] [ARGUMENTS]

The following actions are available:

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:
     -c,--class <classname>               Class with the program entry point
                                          ("main()" method). Only needed if the
                                          JAR file does not specify the class in
                                          its manifest.
     -C,--classpath <url>                 Adds a URL to each user code
                                          classloader  on all nodes in the
                                          cluster. The paths must specify a
                                          protocol (e.g. file://) and be
                                          accessible on all nodes (e.g. by means
                                          of a NFS share). You can use this
                                          option multiple times for specifying
                                          more than one URL. The protocol must
                                          be supported by the {@link
                                          java.net.URLClassLoader}.
     -d,--detached                        If present, runs the job in detached
                                          mode
     -n,--allowNonRestoredState           Allow to skip savepoint state that
                                          cannot be restored. You need to allow
                                          this if you removed an operator from
                                          your program that was part of the
                                          program when the savepoint was
                                          triggered.
     -p,--parallelism <parallelism>       The parallelism with which to run the
                                          program. Optional flag to override the
                                          default value specified in the
                                          configuration.
    ……

可以使用run选项运行Flink任务。这个脚本可以自动获取到YARN session的地址,所以可以不指定--jobmanager参数。

我们以Flink自带的WordCount程序为例进行介绍,先将测试文件上传到HDFS上:

hadoop fs -copyFromLocal LICENSE hdfs:///tmp
./bin/flink run ./examples/batch/WordCount.jar --input hdfs:///tmp/LICENSE
# 输出结果
(0,9)
(1,6)
(10,3)
(12,1)
(15,1)
(17,1)
(2,9)
(2004,1)
(2010,2)
(2011,2)
(2012,5)
(2013,4)
(2014,6)
(2015,7)
(2016,2)
(3,6)
(4,4)
(5,3)
(50,1)
(6,3)
(7,3)
(8,2)
(9,2)
(a,25)
(above,4)
(acceptance,1)
(accepting,3)
(act,1)

Yarn-per-job

image-20200817173429527.png
  1. 每次任务提交都会创建一个Flink集群,任务之间相互独立,互不影响,方便管理。
  2. 任务执行完成后,创建的Flink集群销毁,资源释放。
  3. 这种方式需要确保yarn集群资源足够,适合生产环境。
提交任务

和yarn-session模式一样,使用run选项运行Flink任务。

# -m yarn-cluster 表示是yarn-per-job模式
./bin/flink run -m yarn-cluster  ./examples/batch/WordCount.jar      \
          --input hdfs:///tmp/LICENSE                            \
          --output hdfs:///tmp/result.txt

HA(高可用)

Flink on Yarn 的高可用配置只需要一个 JobManager。当 JobManager 发生失败时,Yarn 负责将其重新启动。

配置AM在尝试重启的最大次数(yarn-site.xml)

$HADOOP_CONF_DIR 的yarn-site.xml添加如下配置:

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
    The maximum number of application master execution attempts.
  </description>
</property>

以上配置是application master在重启时,尝试的最大次数是为4。

配置Application Attempts(flink-conf.yaml)

$FLINK_HOME/conf 的flink-conf.yaml中添加如下配置:

yarn.application-attempts: 10  

以上配置意味着如果flink任务启动失败,YARN 会再重试 9 次(9 次重试 + 1 次启动),如果 YARN 启动 10 次作业还失败,则 YARN 才会将该任务的状态置为失败。如果发生进程抢占,节点硬件故障或重启,NodeManager 重新同步等,YARN 会继续尝试启动应用。 这些重启不计入 yarn.application-attempts 个数中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349