1、名词及术语
术语说明
术语
说明
HDFS
Hadoop分布式文件系统
Hive
基于Hadoop的数据仓库工具
Yarn
资源管理器
RDD
弹性分布式数据集
Partition
RDD的数据分区,可并行计算
NarrowDependency
窄依赖,子RDD依赖于父RDD中固定的Partition,如map转换
WideDependency
宽依赖,子RDD依赖于父RDD中所有的Partition,如groupByKey
DAG
有向无环图,用于反映各RDD之间的依赖关系
yarn
Spark程序基于yarn的运行模式
2.1、开发流程
编写Spark程序à打包成Jar包à提交到Spark集群执行
2.1、开发前准备
(1)安装jdk1.7+
(2)安装apache-maven-3.3.9
(3)可视化开发软件ideaIU
(4)scala 2.11.x
2.2、源码示例
Spark源码是基于Scala进行开发的,因此本文档中的代码示例均采用Scala演示
1.配置ideaIU,安装scala Plugins为下载的scala 2.11.x
2.创建maven工程并配置文件pom.xml
3.Spark程序开发
(1)初始化Spark
创建SparkSession对象,包含了应用程序的信息,可以设置应用程序名称以及运行模式(也可以在运行程序时设置该参数),SparkSession作为spark 2.0引入的新的切入点,包含了SQLContext和HiveContext的功能,在build上创建的参数会自动传入到spark和hadoop
(1)弹性分布式数据集(RDD)和DataSet
Spark 1.x程序的核心是围绕RDD进行的变换,2.0版本统一了DataFrames和DataSet,
DataFrames只是行(Row)数据集的typealias了,并且以DataFrame为重点的机器学习包spark.ml逐渐作为主api出现
①创建RDD
RDD有两种创建方式:
一是从普通数组或List列表创建,例:
valinit_rdd=sparkSession.SparkContext.parallelize(1to9,3)
二是从外部数据源如HDFS:
valinit_rdd=sparkSession.SparkContext.textFile("/recordSystem/sourceFile.txt")
②创建DataSet和DataFrame
使用sparkSession.range创建一个DataSet
valinit_ds=sparkSession.range(5,100,5)
从hive中创建DataFrame
valinit_df=sparkSession.sql("select * from fact_vod")
(2)RDD操作
目前程序的编写还是以RDD为主, DataFrame的转换与RDD的类似.
RDD通过一系列的变换,得到数据分析的结果,如统计一个文本中单词的个数,单词间以空格切分
valinit_rdd=sc.textFile("/recommendSystem/file.txt")
init_rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)
(4)RDD结果存储
经过RDD转换后得到的数据结果可以存储到HDFS文件系统或者hive表中
①存储到文件系统,路径为/recommendSystem:
result_rdd.saveAsTextFile("/recommendSystem")
②存储到hive表,表名为result_table
importorg.apache.spark.sql.SaveMode
result_rdd.toDF().write.mode(SaveMode.Append).saveAsTable("result_table")
2.3、实际案例
程序实现功能:推荐引擎离线计算服务数据预处理
2.4、提交运行
将打包好的Spark程序Jar包发送到Spark集群的master节点或者slave节点上,并执行以下命令运行程序:
SPARK_HOME/bin/spark-submit--master yarn –-deploy-mode client--class CLASS_PATHJAR_PATH ARGS
其中,
SPARK_HOM为集群中Spark的安装目录
CLASS_PATH为主程序入口:包名.Object名
JAR_PATH打包的jar包存放的位置
ARGS程序执行所需传入的参数
如果在程序中设置了master,则在submit的时候可以省略master参数,否则必须添加
上面实际案例的提交命令为:
/opt/spark/curr_spark/bin/spark-submit --deploy-mode client--class com.zhangjian.recommendation.collaborativeFilter.CFDataPretreatment/root/spark_program/recommend-system_1.0-1.0-SNAPSHOT.jar
�w�,vB6