默认情况下,standalone cluster manager对于worker节点的失败是具有容错性的(迄今为止,Spark自身而言对于丢失部分计算工作是有容错性的,它会将丢失的计算工作迁移到其他worker节点上执行)。然而,调度器是依托于master进程来做出调度决策的,这就会造成单点故障:如果master挂掉了,就没法提交新的应用程序了。为了解决这个问题,spark提供了两种高可用性方案,分别是基于zookeeper的HA方案以及基于文件系统的HA方案。
看基于zookeeper的HA方案
概述
使用zookeeper来提供leader选举以及一些状态存储,你可以在集群中启动多个master进程,让它们连接到zookeeper实例。其中
一个master进程会被选举为leader,其他的master会被指定为standby模式。如果当前的leader master进程挂掉了,其他的
standby master会被选举,从而恢复旧master的状态。并且恢复作业调度。整个恢复过程(从leader master挂掉开始计算)大概
会花费1~2分钟。要注意的是,这只会推迟调度新的应用程序,master挂掉之前就运行的应用程序是不被影响的。
配置
如果要启用这个恢复模式,需要在spark-env.sh文件中,设置SPARK_DAEMON_JAVA_OPTS选项:
spark.deploy.recoveryMode 设置为ZOOKEEPER来启用standby master恢复模式(默认为NONE)
spark.deploy.zookeeper.url zookeeper集群url(举例来说,192.168.114.200:2181,192.168.114.201:2181)
spark.deploy.zookeeper.dir zookeeper中用来存储恢复状态的目录(默认是/spark)
备注:如果在集群中启动了多个master节点,但是没有正确配置master去使用zookeeper,master在挂掉进行恢复时是会失败的,
因为没法发现其他master,并且都会认为自己是leader。这会导致集群的状态不是健康的,因为所有master都会自顾自地去调度。
细节
在启动一个zookeeper集群之后,启用高可用性是很直接的。简单地在多个节点上启动多个master进程,并且给它们相同的zookeeper配置(zookeeper url和目录)。master就可以被动态加入master集群,并可以在任何时间被移除掉。
为了调度新的应用程序或者向集群中添加worker节点,它们需要知道当前leader master的ip地址。这可以通过传递一个master列表来完成。举例来说,我们可以将我们的SparkContext连接的地址指向spark://host1:port1,host2:port2。这就会导致你的SparkContext尝试去注册所有的master,如果host1挂掉了,那么配置还是正确的,因为会找到新的leader master,也就是host2。
对于注册一个master和普通的操作,这是一个重要的区别。当一个应用程序启动的时候,或者worker需要被找到并且注册到当前的leader master的时候。一旦它成功注册了,就被保存在zookeeper中了。如果故障发生了,new leader master会去联系所有的之前注册过的应用程序和worker,并且通知它们master的改变。这样的话,它们甚至在启动的时候都不需要知道new master的存在。
正是由于这个属性,new master可以在任何时间被创建,并且我们唯一需要担心的一件事情就是新的应用程序和worker可以找到并且注册到master。一旦注册上去之后,我们就不用担心它了。
实验
将192.168.114.200机器上的spark集群先停止
./sbin/stop-all.sh修改机器上的spark-env.sh文件,在其中加入上述三个属性
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.114.200:2181,192.168.114.201:2181,192.168.114.202:2181 -Dspark.deploy.zookeeper.dir=/spark"启动集群
在192.168.114.200上直接用启动集群:./sbin/start-all.sh在192.168.114.201上部署spark安装包,并启动一个master进程
修改spark-env.sh文件
export JAVA_HOME=/usr/java/latest
export SCALA_HOME=/usr/local/scala
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export SPARK_MASTER_IP=192.168.114.201
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.114.200:2181,192.168.114.201:2181,192.168.114.202:2181 -Dspark.deploy.zookeeper.dir=/spark"
在192.168.0.104上单独启动一个standby master进程:./sbin/start-master.sh
4、提交应用程序
将master地址修改为192.168.114.200:7077,192.168.114.201:7077
如
/opt/module/spark/bin/spark-submit \
--class com.zj.spark.applog.AppLogSpark \
--master spark://spark-project-1:7077,spark-project-2:7077 \
--deploy-mode client \
--num-executors 1 \
--driver-memory 600m \
--executor-memory 600m \
--executor-cores 1 \
--conf spark.cores.max=3 \
/opt/spark-study/mysparkstudy-1.0-SNAPSHOT-jar-with-dependencies.jar \
杀掉原先的leader master,等到standby master接管集群,再次提交应用程序
再次手动启动原来的leader master