JobManager协调每个Flink部署。它负责调度和资源管理。
默认情况下,每个Flink集群都只有一个JobManager实例。这就造成了一个单点故障(SPOF):如果JobManager崩溃,就不能提交新的程序,运行程序也会失败。
而JobManager高可用性,可以从JobManager失败中恢复,因此消除单点故障。你可以为独立集群和YARN集群配置高可用性。
独立集群高可用性
独立集群的JobManager高可用性的概念是,在任何时候都有一个JobManager leader和多个JobManager备份,以防当leader失败时,可以接管领导权。这就保证了没有单点故障,并且当一个备份JobManager获得领导权时,程序可以继续运行。master和备份JobManager实例之间没有区别。每个JobManager都可以扮演master或备份的角色。
例如,考虑以下有三个JobManager实例的设置:
配置
要启用JobManager的高可用性,你必须设置高可用模式为zookeeper,配置一个ZooKeeper quorum和一个拥有所有JobManager主机的masters文件和它们的web UI端口。
Flink利用 ZooKeeper 来进行所有运行中的JobManager实例的分布式协作。ZooKeeper是Flink之外的单独服务,它通过leader选举和轻量级一致状态存储提供高可靠的分布式协作。ZooKeeper的更多信息请看ZooKeeper启动说明。Flink提供了脚本来启动一个简单的ZooKeeper实例。
Masters文件(masters)
配置conf/masters
目录下的 masters 文件来启动高可用集群:
- masters文件: masters文件包含了所有启动JobManager的主机和与web用户交互绑定的端口
```
jobManagerAddress1:webUIPort1
[...]
jobManagerAddressX:webUIPortX
```
默认情况下,JobManager会调下一个随机端口进行进程间通信。可以通过high-availability.jobmanager.port
修改端口。这个配置接收单个端口(例如,50010
),范围(50000-50025
)或者它们的组合(50010,50011,50020-50025,50050-50075
)。
配置文件 (flink-conf.yaml)
添加下属配置到conf/flink-conf.yaml
中来启动高可用集群:
-
high-availability mode (必要的): 为了开启高可用模式,必须在
conf/flink-conf.yaml
中设置high-availability mode 为zookeeper。high-availability: zookeeper
-
ZooKeeper quorum (必要的): ZooKeeper quorum 是一组ZooKeeper服务,它们提供分布式协调服务。
high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181
每个addressX:port 指向一个ZooKeeper Server,通过给的地址和端口,Flink可以访问它。
-
ZooKeeper root (推荐的): 根ZooKeeper节点,所有的集群节点都放置在该节点下。
high-availability.zookeeper.path.root: /flink
-
ZooKeeper cluster-id (推荐的): 集群id ZooKeeper节点,集群所有需要的协调数据都放在该节点下。
high-availability.cluster-id: /default_ns # important: customize per cluster
重要: 当运行YARN集群时,每个作业都有一个YARN会话,或者其它集群管理时,不应该手动设置这个值。在这些情况下,集群id是基于应用id自动生成的。手动设置集群id会覆盖YARN生成的id。反过来,使用 -z CLI 选项指定一个集群id会覆盖手动配置。如果你在裸机上运行多个Flink高可用集群,则必须为每个集群手动配置独立的集群id。
-
存储目录 (必要的): JobManager的元数据被持久化在文件系统storageDir中,并且只有一个指向该状态的指针存储在ZooKeeper中。
high-availability.zookeeper.storageDir: hdfs:///flink/recovery
The
storageDir
存储恢复失败JobManager时需要的所有元数据。
在配置了masters和 ZooKeeper quorum之后,可以像往常一样使用提供的集群启动脚本。它们将启动一个高可用集群。注意,当你调用脚本时,ZooKeeper quorum必须已经正在运行,并且确保为每个启动的高可用集群配置单独的ZooKeeper根路径。
示例: 具有2个JobManager的独立集群
-
Configure high availability mode and ZooKeeper quorum in
conf/flink-conf.yaml
:high-availability: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: /cluster_one # important: customize per cluster high-availability.zookeeper.storageDir: hdfs:///flink/recovery
-
Configure masters in
conf/masters
:localhost:8081 localhost:8082
-
Configure ZooKeeper server in
conf/zoo.cfg
(currently it’s only possible to run a single ZooKeeper server per machine):server.0=localhost:2888:3888
-
Start ZooKeeper quorum:
$ bin/start-zookeeper-quorum.sh Starting zookeeper daemon on host localhost.
-
Start an HA-cluster:
$ bin/start-cluster.sh Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum. Starting jobmanager daemon on host localhost. Starting jobmanager daemon on host localhost. Starting taskmanager daemon on host localhost.
-
Stop ZooKeeper quorum and cluster:
$ bin/stop-cluster.sh Stopping taskmanager daemon (pid: 7647) on localhost. Stopping jobmanager daemon (pid: 7495) on host localhost. Stopping jobmanager daemon (pid: 7349) on host localhost. $ bin/stop-zookeeper-quorum.sh Stopping zookeeper daemon (pid: 7101) on host localhost.