引子
Spark提供了一个名为:stand a lone模式的的简单模式,来运行集群。这个模式中Cluster Manager由Spark自身提供(Spark-Master)。我们可以有两种方式来启动集群:手工方式和使用内置脚本方式
安装集群
Apache网站上提供了Spark的编译版本下载链接,我们可以把这个版本下载到本地,复制到所有机器上,然后解压。另外一种方式是下载源代码自己编译,然后把构建结果复制到所有机器上再解压。*安装完毕后,我们启动集群就可以了。此时会有两个选项,一个是手工启动集群,一个是使用配置的方式启动集群。这两个的区别就在于,master和slave(work node)是在命令行里指定的还是在配置文件里指定的。*
手工启动集群
启动masters
bin/start-master.sh在启动过程中,会输出 spark://HOST:PORT (默认的是 spark://localhost:8080) 来让我们知道,当前Spark Master对外提供服务的URL。后面我们启动work node和编写提交Spark应用的时候都需要用到这个URL
启动work node
sbin/start-slave.sh <这里写的就是启动master时,那边打印出来的URL>启动work node之后,我们可以在master的WEB UI上查看下当前集群内的节点,每个节点都会列出来当前机器上可以让Spark使用的CPU和内存数量。
其他的参数
启动master和work node的时候还有一些参数可以指定(未包含原文中标注已经过期的参数)
-h/--host HOST :监听用的主机名
-p/--port PORT :监听的端口,master默认是7077 work node是随机的
--webui-port PORT :WEBUI使用的端口,我们用浏览器可以查看一些信息。master默认8080, work node默认8081
-c/--cores CORES:当前Work Node可供Spark应用使用的CPU数量(默认是全部),这个参数只对Work Node有效
-m/--memory MEMORY:当前Work Node可供Spark应用使用的内存总数。可以用M或G结尾来标注单位。默认是当前集群全部内存减1G,这个参数只对Work Node有效
-d/--work-dir DIR:当前Work Node用来存储临时数据和输出日志文件的路径,默认是在spark安装目录的work目录内,这个参数只对Work Node有效
--properties-file FILE:spark配置文件的存储路径。默认是spark安装目录的 conf/spark-default.conf
配置文件方式启动集群
这种方式不支持Windows,Windows下,需要使用手工方式启动集群使用配置文件方式启动集群,需要在spark的安装目录里写个文件 conf/slaves,这个文件里要把每个work node的主机名写上,一个写一行。如果没写这个文件,启动的时候,Spark集群就只启动当前一台机器。Master 节点和所有Work Node通信通过SSH免密码登录方式完成。如果没有免密码登录方式,可以设置环境变量SPARK_SSH_FOREGROUND并为每个work node配置密码(这个我没做过)。这个文件写好以后,可以使用sbin目录下的脚本来启动Spark集群了
- sbin/start-master.sh 启动master节点
- sbin/start-slaves.sh 启动所有的work node节点
- sbin/start-slave.sh 在当前机器上启动work node节点
- sbin/start-all.sh 启动所有节点(master和work node)
- sbin/stop-master.sh 关闭master节点
- sbin/stop-slaves.sh 关闭所有work node节点
- sbin/stop-all.sh 关闭所有节点
注意,启动的操作需要在你希望成为master的节点那台机器上做(第三条除外)。我们还可以通过设置环境变量的方式来配置Spark集群。这个文件是 conf/spark-env.sh 。 Spark默认提供了一个模板(conf/spark-env.sh.template)给我们使用,复制重命名然后编辑保存就可以去启动集群了。可配置的环境变量如下
SPARK_MASTER_HOST:绑定Master到一个固定的主机名或者IP SPARK_MASTER_PORT:把Master绑定到一个指定的端口上,默认是7077
SPARK_MASTER_WEBUI_PORT :给Master的WEB UI指定一个端口,默认是8080
SPARK_MASTER_OPTS:给Master单独配置的properties,使用"-D名称=值"的方式。下面有可选参数列表
SPARK_LOCAL_DIRS:Spark各个机器的本地存储路径。比如Map输出的结果,保存到磁盘的RDD等等。这个存储越快越好。有多个的时候用逗号分隔
SPARK_WORKER_CORES:每台机器(work node)上可以让Spark集群使用的CPU数量,默认是全部
SPARK_WORKER_MEMORY:每台机器(work node)可以让Spark集群使用的内存数量,默认是机器全部内存减1G。每个Spark应用在一个Executor中可以独立使用的内存用spark.executor.memory配置
SPARK_WORKER_PORT:Work Node的工作端口,默认是随机的
SPARK_WORKER_WEBUI_PORT:Work Node的WEB UI端口,默认是8081
SPARK_WORKER_DIR:运行Spark应用的目录 SPARK_WORKER_OPTS:和SPARK_MASTER_OPTS一样,只是应用于Work Node
SPARK_DAEMON_MEMORY:给Spark Master和 Work Node这两个节点的后台进程分配的内存,默认是1G
SPARK_DAEMON_JAVA_OPTS:给Spark Master和 Work Node这两个节点的后台进程配置的JVM参数
SPARK_PUBLIC_DNS:集群的DNS名称
SPARK_MASTER_OPTS支持如下属性
spark.deploy.retainedApplications:默认值200。WEB UI上能看到的已完成Spark应用的数量。超过这个数量的Spark应用,在WEB UI上看不到
spark.deploy.retainedDrivers:默认值200。对Driver Progame也一样
spark.deploy.spreadOut:默认值true。stand a lone 模式下Spark 把任务发给所有的节点,或者把任务聚合下尽量少的发给节点。发给所有的节点对操作HDFS数据会更高效,后者对计算密集型程序更高效
spark.deploy.defaultCores:默认值(infinite)无限。stand a lone模式下当Spark应用没设置spark.core.max时,使用这个值作为每个应用分配的CPU数量。如果不设置这个值,一个应用就会获取到所有的CPU。设置这个值可以避免多用户使用集群时,一个Spark应用把所有CPU占住
spark.deploy.maxExecutorRetries:默认值10。Executor失败重试次数,一个Spark应用有正在执行的Executor的话,就不会被移除掉。当多个Executor重试执行某个任务连续失败超过这个数值时,并且当前Spark应用没有其他Executor在执行,则cluster manager就会把这个应用移除,并标记为执行失败。如果不开启这个特性,就把这个参数设置为-1。看样子只有stand a lone集群会生效
spark.worker.timeout:默认值60。stand a lone 模式下 work node心跳超时时间 SPARK_WORKER_OPTS支持如下属性
spark.worker.cleanup.enabled:默认值false。是否可以清除worker 或 Spark应用的本地存储目录。stand a lone 模式下才有用,yarn模式下有另外的方式。清理的时候也只清理已经停止的Spark应用
spark.worker.cleanup.interval:默认值1800 (30 minutes)。清除程序的启动周期
spark.worker.cleanup.appDataTtl:默认值7 * 24 * 3600(7 days)。work node上存储的每个Spark应用的日志和jar包的清除周期
spark.worker.ui.compressedLogFileLengthCacheSize:默认值100。这个真心没看明白
Spark应用连接到集群
要到Spark集群上运行一个Spark应用,最简单的方式是设置Spark Master的spark://IP:PORT 到SparkContext中交互式的Spark Shell使用如下命令执行./bin/spark-shell --master spark://IP:PORT
提交Spark应用到集群
spark-submit 脚本提供了最直接的提交spark应用到集群的方式。stand a lone集群中,有两种运行模式:client和cluster。client模式中,driver program和提交Spark应用的程序运行在本地的同一个进程中。cluster模式中driver program运行在一个work node上,client程序接收到应用提交成功的消息就会退出。用spark-submit 提交的应用程序中的jar会被分发到所有的work node上去,应用程序依赖的其他jar需要使用 -jar 参数提交, 多个jar都逗号分隔。 stand a lone模式下,可以支持任务失败的自动重新执行,需要在使用spark-submit 时指定 --supervise参数想要杀死一个正在执行的Spark应用,可以用如下命令操作./bin/spark-class org.apache.spark.deploy.Client killdriverID可以在http://:8080页面中找到
资源分配
stand a lone cluster 模式目前只支持先进先出模式来分配资源。但是在多个Spark应用并发运行的状态下,我们可以控制每个Spark应用获取的最大资源数量,比如利用将spark.core.max参数设置到SparkConf中
val conf = new SparkConf() .setMaster(...) .setAppName(...) .set("spark.cores.max", "10")
val sc = new SparkContext(conf)
可以在集群启动Spark Master时配置spark.deploy.defaultCores 来控制每个Spark应用可以获取到的最大CPU个数。修改 conf/spark-env.sh中的参数export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores="也可以达到同样的目的
监控和日志
Spark stand a lone 模式提供了一个基于web的用户接口来监控集群。Master和每个Worker都有自己的WEB UI来显示集群和Job信息。默认情况下我们可以使用8080端口访问Master的WEB UI。这些配置可以通过 命令行选项来修改。每个Job详细的输出日志会写到每个Work Node 的work目录下,我们可以看到每个Job的stdout和stderr输出
贴近Hadoop运行
我们可以在Hadoop集群上搭建Spark集群,通过设置 hdfs URL例如:hdfs://:/path的方式让Spark从HDFS上拉取数据。当然,Spark集群搭建在完全一个单独的集群上也是可以的,但是会有额外的网络开销用于从HDFS加载数据。
配置网络安全端口
todo
高可用
默认情况下,Spark Master 是存在单点风险的,Work Node本身就是集群中的多节点,可以做到自动HA。stand a lone模式下应对Spark Master的单点风险有两个策略:依赖于ZK的Stand by Master和依赖本地存储的单点恢复。
依赖于ZK的Stand by Master
基于ZK提供的lead 选举和一些状态的存储与监听机制,我们可以在集群里启动多个Master,每个Master都连接到ZK集群中。一个Master被选举为Leader,其他的则进入Stand by状态。当Leader Master宕机之后,其他的Master会做一次选举,选出新的Leader,旧的Master被重启后重新进入到集群中(Stand by状态)。重新选举Leader的过程大概需要1-2分钟,这段时间内不可以提交新的Spark应用到集群运行,正在运行的Spark应用不会受影响。
配置
我们可以设置 spark-env中的SPARK_DAEMON_JAVA_OPTS的spark.deploy.recoveryMode以及相关的spark.deploy.zookeeper.*配置。关于更多的高可用配置,请参考这里如果我们的集群中有多个Master,但是关于高可用的配置不正确,那么有可能出现多个Master都认为自己是Leader的情况,整个集群会陷入一种不“健康”的状态
细节
设置完依赖ZK的高可用集群后,在多台机器上顺次启动多个Master进程就可以了。Master可以在任何时候添加进集群或移出集群。为了给Spark应用顺利的分配计算资源,或者添加Work Node到集群,Work Node需要知道当前处在Leader状态的Master的IP地址。start-slave/s.sh在执行时可以添加一个或者多个master地址信息。可以这样写: spark://host1:port1,host2:port2。这样,SparkContext会尝试注册两个Master,如果host1挂了,就使用host2。
Spark应用或Work Node注册Master和普通的其他操作有一个重要的区别,当启动的时候,应用或者Work Node需要找到并注册当前的Leader Master。一旦成功,这个任务或者Work Node就存在于集群中了。如果此时发生了Master挂掉的问题,新的Leader会通知所有已经注册的应用或Work Node自己是新的Leader,所以它们在启动时可以不知道新的Master的存在。
基于上述描述,新的Master可以随时被创建,添加到集群中来,我们唯一需要关注的就是当Leader发生切换的时候新的Spark应用和Work Node可以找到并注册自己。
依赖本地存储的单点恢复
概述
ZK是实现高可用的一个很好的选择,但如果我们希望当Master挂了的时候,直接去恢复他,文件系统模式(FILESYSTEM mode)可以实现.当Spark 应用和WorkNode注册完毕后,他们的相关信息都可以被写入到本地磁盘中,当Master重启的时候也就可以从磁盘恢复这些信息。
配置
recovery mode需要在spark-env中配置SPARK_DAEMON_JAVA_OPTS的下列属性spark.deploy.recoveryMode:设置股值为 FILESYSTEM 启动Spark的Master后,就会进入到这个模式spark.deploy.recoveryDirectory:这个路径配置好后,Master会把信息存储到这个路径下,唯一要注意的是Master需要这个路径的读写权限
细节
这个模式用在类似串行执行 monitor/manager场景下(原文中有like monit 的链接,但没看明白是干什么),或者手动重启Master
Master关了之后,使用文件系统恢复比什么都不做强一些。这个模式一般用在开发环境或者试验环境。如果我们直接kill掉Master或者使用stop-master脚本关掉Master,但没有清理掉recover state信息。重启Master的时候还是会进入到Recovery模式。这回导致Master的启动比正常情况下慢大概1分钟左右。
我也可以把写入recovery state信息的位置放到NFS上,这样当原来的Master挂了的时候,我们希望在新的机器上启动一个Master,新的Master可以很轻易的获取到Recorvery State信息,完成重启。但是后面再提交Spark应用的时候需要设置新的Master。这个方式不是官方推荐的,但是理论上是可行的。