Centos7 安装Flink1.16

1 集群角色

image.png

2 Flink集群搭建

2.1 集群启动

0)集群规划

节点服务器 hadoop101 hadoop102 hadoop103
角色 JobManager
TaskManager
TaskManager TaskManager

具体安装部署步骤如下:
1)下载并解压安装包

(1)下载安装包flink-1.16.1-bin-scala_2.12.tgz,将该jar包上传到hadoop101节点服务器的/opt/software路径上。

(2)在/opt/software路径上解压flink-1.16.1-bin-scala_2.12.tgz到/opt/module路径上。

[yobhel@hadoop101 software]$ tar -zxvf flink-1.16.1-bin-scala_2.12.tgz -C /opt/module/

2)修改集群配置

(1)进入conf路径,修改flink-conf.yaml文件,指定hadoop101节点服务器为JobManager

[yobhel@hadoop101 conf]$ vim flink-conf.yaml

修改如下内容:

# JobManager节点地址.
jobmanager.rpc.address: hadoop101
jobmanager.bind-host: hadoop101
rest.address: hadoop101
rest.bind-address: hadoop101
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: hadoop101
taskmanager.host: hadoop101

之后分发flink到hadoop102和hadoop103机器,并修改以下参数为当前主机名,例如以修改hadoop102机器的配置为例:

taskmanager.bind-host: hadoop102
taskmanager.host: hadoop103

(2)在需要执行群起命令的机器,修改workers文件

[yobhel@hadoop101 conf]$ vim workers

修改如下内容:

hadoop101
hadoop102
hadoop103

(3)在需要执行群起命令的机器,修改masters文件

[yobhel@hadoop101 conf]$ vim masters

修改如下内容:

hadoop101:8081

(4)另外,在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置,主要配置项如下:

  • jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。

  • taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。

  • taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。

  • parallelism.default:Flink任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。

关于Slot和并行度的概念,我们会在下一章做详细讲解。

3)启动集群

(1)在hadoop101节点服务器上执行start-cluster.sh启动Flink集群:

[yobhel@hadoop101 flink-1.16.1]$ bin/start-cluster.sh

(2)查看进程情况:

[yobhel@hadoop101 flink-1.16.1]$ jpsall 
=============== hadoop101 ===============
4453 StandaloneSessionClusterEntrypoint
4458 TaskManagerRunner
4533 Jps
=============== hadoop102 ===============
2872 TaskManagerRunner
2941 Jps
=============== hadoop103 ===============
2948 Jps
2876 TaskManagerRunner

4)访问Web UI

启动成功后,同样可以访问http://hadoop101:8081对flink集群和任务进行监控管理。

image.png

2.2 命令行向集群提交作业

除了通过WEB UI界面提交任务之外,也可以直接通过命令行来提交任务。这里为方便起见,我们可以先把jar包直接上传到目录flink-1.16.1下

(1)首先需要启动集群。

[yobhel@hadoop101 flink-1.16.1]$ bin/start-cluster.sh

(2)在hadoop101中执行以下命令启动netcat。

[yobhel@hadoop101 flink-1.16.1]$ nc -lk 7777

(3)将flink程序运行jar包上传到/opt/module/flink-1.16.1路径。

(4)进入到flink的安装路径下,在命令行使用flink run命令提交作业。

[yobhel@hadoop101 flink-1.16.1]$ bin/flink run -m hadoop101:8081 -c com.yobhel.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar

这里的参数 -m指定了提交到的JobManager,-c指定了入口类。

(5)在浏览器中打开Web UI,http://hadoop101:8081查看应用执行情况

用netcat输入数据,可以在TaskManager的标准输出(Stdout)看到对应的统计结果


image.png

(6)在/opt/module/flink-1.16.1/log路径中,可以查看TaskManager节点。

[yobhel@hadoop101 log]$ cat flink-yobhel-standalonesession-0-hadoop101.out

(hello,1)
(hello,2)
(flink,1)
(hello,3)
(scala,1)

3 部署模式

在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode)、应用模式(Application Mode)。
它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行——客户端(Client)还是JobManager。

3.1 会话模式(Session Mode)

image.png

3.2 单作业模式(Per-Job Mode)

image.png

3.3 应用模式(Application Mode)

image.png

这里我们所讲到的部署模式,相对是比较抽象的概念。实际应用时,一般需要和资源管理平台结合起来,选择特定的模式来分配资源、部署应用。接下来,我们就针对不同的资源提供者的场景,具体介绍Flink的部署方式。

4 Standalone运行模式(了解)

独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。

4.1 会话模式部署

我们在第2节用的就是Standalone集群的会话模式部署。
提前启动集群,并通过Web页面客户端提交任务(可以多个任务,但是集群资源固定)。


image.png

4.2 单作业模式部署

Flink的Standalone集群并不支持单作业模式部署。因为单作业模式需要借助一些资源管理平台。

4.3 应用模式部署

应用模式下不会提前创建集群,所以不能调用start-cluster.sh脚本。我们可以使用同样在bin目录下的standalone-job.sh来创建一个JobManager。


image.png

具体步骤如下:
(0)环境准备。在hadoop101中执行以下命令启动netcat。

[yobhel@hadoop101 flink-1.16.1]$ nc -lk 7777

(1)进入到Flink的安装路径下,将应用程序的jar包放到lib/目录下。

[yobhel@hadoop101 flink-1.16.1]$ mv FlinkTutorial-1.0-SNAPSHOT.jar lib/

(2)执行以下命令,启动JobManager。

[yobhel@hadoop101 flink-1.16.1]$ bin/standalone-job.sh start --job-classname com.yobhel.wc.SocketStreamWordCount

这里我们直接指定作业入口类,脚本会到lib目录扫描所有的jar包。
(3)同样是使用bin目录下的脚本,启动TaskManager。

[yobhel@hadoop101 flink-1.16.1]$ bin/taskmanager.sh start

(4)在hadoop101上模拟发送单词数据。

[yobhel@hadoop101 ~]$ nc -lk 7777
hello

(5)在hadoop101:8081地址中观察输出数据


image.png

(6)如果希望停掉集群,同样可以使用脚本,命令如下。

[yobhel@hadoop101 flink-1.16.1]$ bin/taskmanager.sh stop
[yobhel@hadoop101 flink-1.16.1]$ bin/standalone-job.sh stop

5 YARN运行模式(重点)

YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。
5.1 相关准备和配置
在将Flink任务部署至YARN集群之前,需要确认集群是否安装有Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有HDFS服务。
具体配置步骤如下:
(1)配置环境变量,增加环境变量配置如下:

$ sudo vim /etc/profile.d/my_env.sh

HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

(2)启动Hadoop集群,包括HDFS和YARN。

[yobhel@hadoop101 hadoop-3.1.3]$ start-dfs.sh
[yobhel@hadoop102 hadoop-3.1.3]$ start-yarn.sh

(3)在hadoop101中执行以下命令启动netcat。

[yobhel@hadoop101 flink-1.16.1]$ nc -lk 7777

5.2 会话模式部署

YARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN Session)来启动Flink集群。具体步骤如下:
1)启动集群
(1)启动Hadoop集群(HDFS、YARN)。
(2)执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群。
[yobhel@hadoop101 flink-1.16.1]$ bin/yarn-session.sh -nm test
可用参数解读:

  • -d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
  • -jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB。
  • -nm(--name):配置在YARN UI界面上显示的任务名。
  • -qu(--queue):指定YARN队列名。
  • -tm(--taskManager):配置每个TaskManager所使用内存。
    注意:Flink1.11.0版本不再使用-n参数和-s参数分别指定TaskManager数量和slot数量,YARN会按照需求动态分配TaskManager和slot。所以从这个意义上讲,YARN的会话模式也不会把集群资源固定,同样是动态分配的。
    YARN Session启动之后会给出一个Web UI地址以及一个YARN application ID,如下所示,用户可以通过Web UI或者命令行两种方式提交作业。
2022-11-17 15:20:52,711 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface hadoop103:40825 of application 'application_1668668287070_0005'.
JobManager Web Interface: http://hadoop103:40825

2)提交作业
(1)通过Web UI提交作业
这种方式比较简单,与上文所述Standalone部署模式基本相同。


image.png

(2)通过命令行提交作业
① 将FlinkTutorial-1.0-SNAPSHOT.jar任务上传至集群。
② 执行以下命令将该任务提交到已经开启的Yarn-Session中运行。

[yobhel@hadoop101 flink-1.16.1]$ bin/flink run
-c com.yobhel.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar

客户端可以自行确定JobManager的地址,也可以通过-m或者-jobmanager参数指定JobManager的地址,JobManager的地址在YARN Session的启动页面中可以找到。
③ 任务提交成功后,可在YARN的Web UI界面查看运行情况。hadoop102:8088。


image.png

从上图中可以看到我们创建的Yarn-Session实际上是一个Yarn的Application,并且有唯一的Application ID。
④也可以通过Flink的Web UI页面查看提交任务的运行情况,如下图所示。


image.png

5.3 单作业模式部署

在YARN环境中,由于有了外部平台做资源调度,所以我们也可以直接向YARN提交一个单独的作业,从而启动一个Flink集群。
(1)执行命令提交作业。

[yobhel@hadoop101 flink-1.16.1]$ bin/flink run -d -t yarn-per-job -c com.yobhel.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar

注意:如果启动过程中报如下异常。

Exception in thread “Thread-5” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders

解决办法:在flink的/opt/module/flink-1.16.1/conf/flink-conf.yaml配置文件中设置

[yobhel@hadoop101 conf]$ vim flink-conf.yaml

classloader.check-leaked-classloader: false

(2)在YARN的ResourceManager界面查看执行情况。


image.png

点击可以打开Flink Web UI页面进行监控,如下图所示:


image.png

(3)可以使用命令行查看或取消作业,命令如下。

[yobhel@hadoop101 flink-1.16.1]$ bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY

[yobhel@hadoop101 flink-1.16.1]$ bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

这里的application_XXXX_YY是当前应用的ID,<jobId>是作业的ID。注意如果取消作业,整个Flink集群也会停掉。

5.4 应用模式部署

应用模式同样非常简单,与单作业模式类似,直接执行flink run-application命令即可。
(1)执行命令提交作业。

[yobhel@hadoop101 flink-1.16.1]$ bin/flink run-application -t yarn-application -c com.yobhel.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar 

(2)在命令行中查看或取消作业。

[yobhel@hadoop101 flink-1.16.1]$ bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

[yobhel@hadoop101 flink-1.16.1]$ bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

(3)也可以通过yarn.provided.lib.dirs配置选项指定位置,将jar上传到远程。

[yobhel@hadoop101 flink-1.16.1]$ bin/flink run-application -t yarn-application  -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"   hdfs://myhdfs/jars/my-application.jar

这种方式下jar可以预先上传到HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。

6 历史服务器

运行 Flinkjob 的集群一旦停止,只能去yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。

Flink提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有当作业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。

此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。

1)创建存储目录

hadoop fs -mkdir -p /logs/flink-job

2)在 flink-config.yaml中添加如下配置

jobmanager.archive.fs.dir: hdfs://hadoop101:8020/logs/flink-job
historyserver.web.address: hadoop101
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://hadoop101:8020/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000

3)启动历史服务器

bin/historyserver.sh start

4)停止历史服务器

bin/historyserver.sh  stop

5)在浏览器地址栏输入:<u>http://hadoop101:8082</u>查看已经停止的job的统计信息

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容