概览
JobManager协调每个Flink部署。 它负责调度和资源管理。
默认情况下,每个Flink集群只有一个JobManager实例。 这将创建一个单点故障(SPOF):如果JobManager崩溃,则无法提交任何新程序,并且正在运行的程序也会失败。
使用JobManager高可用性,您可以从JobManager故障中恢复并消除SPOF。 您可以为独立群集和YARN群集配置高可用性。
在Flink Wiki中的 JobManager High Availability 中查看更多HA实现细节 。
独立群集高可用性
独立集群的JobManager高可用性的总体思想是,随时有一个JobManager leader,并且有多个备用JobManager可以在leader失败的情况下接管leader。 这样可以保证不会出现单点故障,并且只要待机JobManager处于领导地位,程序就可以取得进展。 备用JobManager实例和主JobManager实例之间没有明显区别。 每个JobManager都可以充当主角色或备用角色。
作为示例,请考虑以下三个JobManager实例的设置:
配置
要启用JobManager高可用性,您必须将高可用性模式设置为zookeeper,配置ZooKeeper quorum并使用所有JobManager主机及其Web UI端口设置主文件。
Flink利用ZooKeeper在所有正在运行的JobManager实例之间进行分布式协调。 ZooKeeper是Flink的一项独立服务,该服务通过leader选举和轻量级一致状态存储提供高度可靠的分布式协调。 查看 ZooKeeper’s Getting Started Guide,了解有关ZooKeeper的更多信息。 Flink包含用于引导简单的ZooKeeper安装的脚本。
Masters File (masters)
为了启动HA集群,请在conf / masters中配置masters文件:
masters文件:masters文件包含启动了JobManager的所有主机,以及Web用户界面绑定的端口。
jobManagerAddress1:webUIPort1
[...]
jobManagerAddressX:webUIPortX
默认情况下,作业管理器将选择一个随机端口进行进程间通信。 您可以通过high-availability.jobmanager.port键更改此设置。 此key接受单个端口(例如50010),范围(50000-50025)或两者的组合(50010,50011,50020-50025,50050-50075)。
Config File (flink-conf.yaml)
为了启动HA集群,请向 conf / flink-conf.yaml添加以下配置key:
高可用性模式(必需):必须在conf / flink-conf.yaml中将高可用性模式设置为zookeeper,以启用高可用性模式。 或者,可以将此选项设置为Flink用来创建HighAvailabilityServices实例的工厂类的FQN。
high-availability: zookeeper
ZooKeeper quorum (必需):ZooKeeper quorum是ZooKeeper服务器的复制组,它们提供分布式协调服务。
high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181
每个addressX:port都引用一个ZooKeeper服务器,Flink在给定的地址和端口可以访问该服务器。
ZooKeeper root(推荐):根ZooKeeper节点,所有群集节点都放置在该根节点下。
high-availability.zookeeper.path.root: /flink
ZooKeeper cluster-id(推荐):cluster-id ZooKeeper节点,在该节点下放置了群集的所有必需协调数据。
high-availability.cluster-id: /default_ns # important: customize per cluster
重要说明:在运行YARN群集,每作业YARN会话或在另一个群集管理器上时,您不应手动设置此值。 在这些情况下,将根据应用程序ID自动生成cluster-id。 手动设置cluster-id会覆盖YARN中的此行为。 依次使用-z CLI选项指定集群ID会覆盖手动配置。 如果您在裸机上运行多个Flink HA群集,则必须为每个群集手动配置单独的群集ID。
存储目录(必需):JobManager元数据保留在文件系统storageDir中,并且仅指向此状态的指针存储在ZooKeeper中。
high-availability.storageDir: hdfs:///flink/recovery
storageDir存储恢复JobManager故障所需的所有元数据。
配置了主服务器和ZooKeeper quorum后,您可以照常使用提供的群集启动脚本。 他们将启动HA集群。 请记住,调用脚本时必须运行ZooKeeper quorum,并确保为要启动的每个HA群集配置单独的ZooKeeper根路径。
Example: Standalone Cluster with 2 JobManagers
1 在conf / flink-conf.yaml中配置高可用性模式和ZooKeeper quorum:
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: hdfs:///flink/recovery
2 Configure masters in conf/masters:
localhost:8081
localhost:8082
3 在conf / zoo.cfg中配置ZooKeeper服务器(目前,每台机器只能运行一个ZooKeeper服务器):
server.0=localhost:2888:3888
4 Start ZooKeeper quorum:
$ bin/start-zookeeper-quorum.sh
Starting zookeeper daemon on host localhost.
5 Start an HA-cluster:
$ bin/start-cluster.sh
Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
Starting standalonesession daemon on host localhost.
Starting standalonesession daemon on host localhost.
Starting taskexecutor daemon on host localhost.
6 Stop ZooKeeper quorum and cluster:
$ bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 7647) on localhost.
Stopping standalonesession daemon (pid: 7495) on host localhost.
Stopping standalonesession daemon (pid: 7349) on host localhost.
$ bin/stop-zookeeper-quorum.sh
Stopping zookeeper daemon (pid: 7101) on host localhost.
YARN Cluster High Availability
在运行高度可用的YARN群集时,我们不会运行多个JobManager(ApplicationMaster)实例,而只能运行一个实例,当实例出现故障时,YARN会重新启动该实例。 确切的行为取决于您使用的特定YARN版本。
Configuration
最大应用程序主尝试次数(yarn-site.xml)
您必须在yarn-site.xml中为YARN设置配置应用程序主数据库的最大尝试次数:
当前YARN版本的默认值为2(表示可以容忍单个JobManager故障)。
Application Attempts (flink-conf.yaml)
除了高可用性配置(参见上文),您还必须在conf / flink-conf.yaml中配置最大尝试次数:
yarn.application-attempts: 10
这意味着在YARN使应用程序失败之前,可以为失败的尝试将应用程序重新启动9次(9次重试+ 1次初始尝试)。 如果YARN操作需要,则YARN可以执行其他重新启动:抢占,节点硬件故障或重新启动,或NodeManager重新同步。 这些重新启动不计入yarn.application-attempts,请参阅Jian Fang的博客文章。 请务必注意,yarn.resourcemanager.am.max-attempts是应用程序重新启动的上限。 因此,在Flink中设置的应用程序尝试次数不能超过启动YARN的YARN群集设置。
Container Shutdown Behaviour
YARN 2.3.0 <版本<2.4.0。 如果应用程序主服务器失败,则所有容器都将重新启动。
YARN 2.4.0 <版本<2.6.0。 TaskManager容器在应用程序主服务器发生故障时保持活动状态。 这样的优点是启动时间更快,并且用户不必等待再次获取容器资源。
YARN 2.6.0 <=版本:将尝试失败的有效性间隔设置为Flinks的Akka超时值。 尝试失败有效性间隔表示只有在系统在一个间隔内看到最大次数的应用程序尝试后,才会终止应用程序。 这样可以避免长时间的工作会耗尽应用程序的尝试次数。
注意:Hadoop YARN 2.4.0有一个主要错误(已在2.5.0中修复),阻止容器从重新启动的Application Master / Job Manager容器重新启动。 有关详细信息,请参见FLINK-4142。 对于YARN上的高可用性设置,我们建议至少使用Hadoop 2.5.0。
Example: Highly Available YARN Session
1 Configure HA mode and ZooKeeper quorum in conf/flink-conf.yaml:
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink
yarn.application-attempts: 10
2 Configure ZooKeeper server in conf/zoo.cfg (currently it’s only possible to run a single ZooKeeper server per machine):
server.0=localhost:2888:3888
3 Start ZooKeeper quorum:
$ bin/start-zookeeper-quorum.sh
Starting zookeeper daemon on host localhost.
4 Start an HA-cluster:
$ bin/yarn-session.sh -n 2
Configuring for Zookeeper Security
如果ZooKeeper使用Kerberos在安全模式下运行,则可以根据需要在flink-conf.yaml中覆盖以下配置:
有关用于Kerberos安全性的Flink配置的更多信息,请参见此处。 您还可以在此处找到有关Flink如何在内部设置基于Kerberos的安全性的更多详细信息。
Zookeeper Versions
Flink随附了用于3.4和3.5的单独Zookeeper客户端,其中3.4位于发行版的lib目录中,因此默认使用,而3.5放置在opt目录中。
3.5客户端允许您通过SSL保护Zookeeper连接,但可能不适用于3.4- Zookeeper安装。
您可以通过将任一jar放在lib目录中来控制Flink使用哪个版本。
Bootstrap ZooKeeper
如果没有正在运行的ZooKeeper安装,则可以使用Flink附带的帮助程序脚本。
conf / zoo.cfg中有一个ZooKeeper配置模板。 您可以配置主机以在server.X条目上运行ZooKeeper,其中X是每个服务器的唯一ID:
脚本bin / start-zookeeper-quorum.sh将在每个配置的主机上启动ZooKeeper服务器。 启动的进程通过Flink包装器启动ZooKeeper服务器,该包装器从conf / zoo.cfg中读取配置,并确保设置一些必需的配置值以方便使用。 在生产设置中,建议管理您自己的ZooKeeper安装。