Spark简介
相较于国内外较多的大数据处理框架,Spark以基低延时的出色表现,正在成为继Hadoop的MapReduce之后,新的,最具有影响的大数据框架之一。以Spark为核心的生态圈,最底层为分布式存储系统HDFS,S3,Hypertable或者其它格式的存储系统如Hbase。资源管理采用Mesos,YARN等集群资源管理模式,或者spark自带的独立运行模式 。Spark sql提供SQL查询服务,性能比Hive快3-50倍。MLIB提供机器学习服务;GraphX提供图计算服务;Spark Streaming将流式计算分解成一系列短小的批处理(Micro Bath)计算。
好处:资源利用率高。多种框架共享资源,使用均衡。实现数据共享。多种框架共享数据和硬件资源,减少数据分散带来的成本。有效降低运维和管理成本:共享模式只需要少量的维护人员.
Spark已经成为整合以大数据应用的标准平台:交互式查询,包括SQL;实时流处理;复杂的分析,包括机器学习,图计算;批处理。
Spark特点:
快速:Spark有先进的DAG执行引擎,支持循环数据流和内存计算;Spark程序在内存中的运行速度 是Hadoop MapReduce运行速度的100倍,在磁盘上的运行速度是Hadoop MapReduce运行速度的10倍。
易用:Spark支持Java,Scala,Python语言。
通用:Spark可以与SQL,Streaming以及复杂的分析良好结合。
有效集成Hadoop:Spark可以指定Hadoop,Yarn的版本来编译出合适的发行版本,Spark也能很容易地运行在EC2,Mesos上,或以Standalong模式运行,并从HDFS,HBase,Cassandra 和其他Hadoop数据源读取数据。
Spark应用场景:
1.快速查询系统,基于日志数据的快速查询系统业务构建于Spark之上,利用其快速查询以及内存表等优势,能够承担大部分日志数据的即时查询工作;在性能方面,普遍比Hive快2-10倍,如果使用内存表的功能,性能将会比hive快百倍。
2.实时日志采集处理,通过Spark Streaming实时进行业务日志采集,快速迭代处理,并进行综合分析,能够满足线上系统分析要求。
3.业务推荐系统,使用spark将业务推荐系统的小时和天级别的模型训练转变为分钟级别的模型训练,有效优化相关排名,个性化推荐以及热点点击分析等 。
4.定制广告系统,在定制广告业务方面需要大数据做应用分析,效果分析,定向优化等,借助spark快速迭代的优势,实现了在“数据实时采集,算法实时训练,系统实时预测”的全流程实时并行高维算法,支持上亿的请求量处理;模拟广告投放计算效率高,延时小,同MapReduce相比延时至少降低一个数量级。
5.用户图计算,利用Graphx解决了许多生产问题,包括以下计算场景;基于度分布的中枢节点发现,基于最大连通图的社区发现,基于三角形计数的关系衡量,基于随机游走的用户属性传播等。
Spark SQL是spark的一个处理结构化数据的模块,提供一个DataFrame编程抽象。它可以看作是一个分布式SQL查询引擎,主要由Catalyst优化,Spark SQL内核,Hive支持三部分组成。
从1.3开始在原有SchemaRDD的基础上提供了与R风格类似的DataFrame API.
DataFrame是以指定列(named columns)组织的分布式数据集合,在Spark SQL中相当于关系数据库的一个表,或R/Python的一个数据框架,但后台更加优化。 DataFrame 支持多种数据源构建,包括:结构化数据文件(Parquet,JSON)加载,HIVE表读取,外部数据库读取,现有RDD转化,以及SQLContext运行sql查询结果创建DataFrame.
Spark Streaming:
Spark Streaming属于核心Spark Api的扩展,它支持高吞吐量和容错的实时流数据处理,它可以接受来自kafka,flume,twitter,zeroMQ或Tcp Socket的数据源,使用复杂的算法表达和高级功能来进行处理,如Map,Reduce,Join, window等,处理的结果数据能够存入文件系统,数据库。
Saprk部署和运行
本地部署模式,独立模式部署,YARN模式部署,以及基于各种模式的应用程序运行。
Spark下载路径:http://spark.apache.org/downloads.html
安装JAVA:
下载并安装好java后,进行配置:在/etc/profile文件中增加变量
sudo vim /etc/profile
export JAVA_HOME=$YOUR_JAVA_HOME# //实际安装路径
export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
如果想立即生效,可以通过运行# source /etc/profile,否则只能在下次用户重新登录加载环境变量时生效。运行 java -version测试。
关于JDK环境变量配置一般包括4种方式:
1.在用户环境变量文件 /etc/profile文件中添加变量,需要具有root权限才能进行配置,对linux下所有用户长期有效。
2.在用户目录下的.profile文件中增加变量,对当前用户长期生效 。
3.直接运行export命令定义变量,只对当前shell临时有效。在shell的命令行下直接使用[export 变量名=变量值]定义变量,该变量只在当前的shell或其子shell下是有效的,若shell关闭,变量则生效,再打开新shell时就没有这个变量,若需要使用,则还需要重新定义 。
4.在系统环境变量 /etc/environment中进行配置。
Spark部署主要包括Local模式部署,Standalone模式部署,YARN模式部署,Mesos模式部署。
Standalone- Spark独立部署意味着Spark占据HDFS(Hadoop分布式文件系统)顶部的位置,Spark 框架自身也自带了完整的资源调度管理服务,可以独立部署到一个集群中。
Spark on YARN - Spark 可以运行于YARN之上,和Hadoop 进行统一部署。资源管理和调度依赖YARN,分布式存储依赖HDFS。
Spark on Mesos - Mesos 是一种资源调度管理框架,可以为运行在它上面的Spark 提供资源调度服务。
ssh免密:
ssh-keygen -t rsa // /root/.ssh/id_rsa
cd /root/.ssh
cp id_rsa.pub authorized_keys
cd /usr/local
ssh-copy-id -i node002 //发送到node002
ssh node002
Spark standalone集群:
/usr/spark/spark3/conf
#mv spark-env.sh.template spark-env.sh
export SPARK_MASTER_IP=node001
export SPARK_MASTER_PORT=7077
export SPARK_EXECUTOR_INSTANCES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=256M
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_CONF_DIR=/usr/spark/spark3/conf
export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64
export JRE_HOME=${JAVA_HOME}/jre
mv slaves.template slaves
vi slaves //打开slaves去掉localhost,增加如下内容
node002 #slave1,node001为master
将spark文件夹同步到其他从机
scp -r /usr/spark/spark3 root@node002:/usr/spark/
启动spark
/usr/spark/spark3/sbin/start-all.sh
打开控制台:
在Standalone模式运行Spark应用程序
1.spark-shell运行应用程序
在Spark集群上运行应用程序,需要将Master的spark://ip:port传递给sparkcontext构造函数。
在集群上运行交互式的spark命令spark-shell,该命令将会用spark-env.sh中的SPARK_MASTER_IPD和SPARK_MASTER_PORT自动设置Master.
./bin/spark-shell --master spark://ip:port
2.spark-submit启动应用程序
spark-submit向集群提交spark应用程序比较直接,对于独立部署模式的集群,spark支持Client部署模式,即在提交应用的客户端进程中部署Driver.
应用程序通过spark-submit启动,应用程序的jar包将会自动地分配给所有的Worker节点;对于任何其他运行应用程序时需要依赖的Jar包,可以通过-jar声明,jar包名之间用逗号隔开。
./bin/spark-submit --class xxx.XX --master spark://node001:7077 --executor-memory 2G --total-executor-cores 2 xxxx.xxxx.jar
应用程序提交过程
Spark应用程序在集群上以独立进程集合的形式运行,接受用户Driver程序中main函数sparkcontext对象的协调。当任务提交到集群上,SparkContext对象可以与多种集群管理(Standalone部署,Mesos,YARN模式)连接,这些集群管理器负责为所有应用分配资源。一旦连接建立,Spark可以在集群的节点上获得Executor,这些Executor进程负责执行计算和为应用程序存储数据。
(1)sparkContext向资源管理器注册并申请资源
(2)资源管理器根据预先设定的算法,在资源池里分配合适的Executor运行资源
(3)应用(Main函数里的算子)构建有向无环图
(4)DAGScheduler将图转换成TaskSet
(5)TaskScheduler负责TaskSet的任务分发。
Executor之间可以相互通信。
SparkShell是交互控制台,使用scala程序;./bin/spark-shell --master local[*] //--master用来设置context将要连接并使用的资源主节点,master的值是standalone模式的spark集群地址,Mesos,Yarn集群的url.或者是一个local地址;使用--jars可以添加jar包的路径,逗号隔开多个。spark-shell本质是在后台调用了spark-submit脚本来启动应用程序。
scala> val textFile = sc.textFile("file:///usr/spark/spark3/README.md")
textFile: org.apache.spark.rdd.RDD[String] = file:///usr/spark/spark3/README.md MapPartitionsRDD[1] at textFile at <console>:24
加载HDFS文件和本地文件都是使用textFile,区别是添加前缀(hdfs:// 和 file://)进行标识,从本地文件读取文件直接返回MapPartitionsRDD,而从HDFS读取的文件先转成HadoopRDD,然后隐式转换成MapPartitionsRDD.它们都是Spark的弹性分布式数据集(RDD)。