spark
SparkContext模板
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
val lines = sc.textFile("路径")
sc.stop()
SparkSession模板
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
简介:
1.spark是什么:Lightning-fast unified analytics engine(一个高效的快读的分析计算引擎)
2.spark特点(Spark积极使用内存、多进程模型(MR) vs多线程模型(Spark))
1)速度快(spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流)
2)使用简单 支持scala、java、python、R的API,还支持超过80中的高级算法,使用户可以快读构建不同的应用,支持交互式的python和scala的shell,可以非常方便的在这些shell中使用spark集群来验证解决问题
3)通用,spark提供了统一的解决方案,spark可以用于批处理、交互式查询(spark sql)、实时流处理(spark streaming)、机器学习(spark mllib)和图计算(graphX),这些不同类型的处理都可以在同一个应用中无缝使用,减少开发和维护层本
4)兼容好 spark可以非常方便的与其他的开源产品进行融合,spark可以使用yarn、mesos作为资源管理和调度器可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具
3.spark运行架构
cluster manager:集群资源的管理者 (Standalone、Yarn、Mesos)
worker node:工作节点,管理本地资源
driver program:运行应用的main方法并且创建了sparkcontext 由cluster Manager分配资源 sparkContext 发送task到Executor上执行
executor:在工作节点上运行,执行driver发送的Task,并向driver汇报计算结果
Spark支持3种集群部署模式:Standalone、Yarn、Mesos
1、Standalone模式
独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源
管理系统。从一定程度上说,该模式是其他两种的基础
Cluster Manager:Master
Worker Node:Worker
仅支持粗粒度的资源分配方式
2、Spark On Yarn模式
Yarn拥有强大的社区支持,且逐步已经成为大数据集群资源管理系统的标准
在国内生产环境中运用最广泛的部署模式
Spark on yarn 的支持两种模式:
yarn-cluster:适用于生产环境
yarn-client:适用于交互、调试,希望立即看到app的输出
Cluster Manager:ResourceManager
Worker Node:NodeManager
仅支持粗粒度的资源分配方式
3、Spark On Mesos模式
官方推荐的模式。Spark开发之初就考虑到支持Mesos
Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然
Cluster Manager:Mesos Master
Worker Node:Mesos Slave
支持粗粒度、细粒度的资源分配方式
学习流程 sparkRDD -> schemaRDD(DataFrame) ->
本地模式
本地模式部署在单机,主要用于测试或实验;最简单的运行模式,所有进程都运行在
一台机器的 JVM 中;
本地模式用单机的多个线程来模拟Spark分布式计算,通常用来验证开发出来的应用
程序逻辑上有没有问题;
这种模式非常简单,只需要把Spark的安装包解压后,改一些常用的配置即可使用。
不用启动Spark的Master、Worker守护进程,也不用启动Hadoop的服务(除非用
到HDFS)。
local:在本地启动一个线程来运行作业;
local[N]:启动了N个线程;
local[*]:使用了系统中所有的核;
run-example SparkPi 10 spark-shell // HDFS 文件 val lines = sc.textFile("/wcinput/wc.txt") lines.flatMap(.split(" ")).map((, 1)).reduceByKey(+).collect().foreach(println)
local[N,M]:第一个参数表示用到核的个数;第二个参数表示容许作业失败的次
数
前面几种模式没有指定M参数,其默认值都是1;
1.6 spark与MapReduce区别
| Mapreduce | Spark |
| ------------------------------------------------ | -------------------------------------------------------- |
| 数据存储:磁盘HDFS文件系统的split | 使用内存构建弹性分布式数据集RDD对数据进行运算和cache |
| 变成方式:Map+Reduce 仅提供连个操作,表达力欠缺 | 提供了丰富的算子,使数据处理逻辑的代码非常简短 |
| 计算中间结果落到磁盘,io及序列化、反序列化代价大 | 计算的中间结果在内存中,维护存取比磁盘高几个数量级 |
| task以进程的方式维护,启动jvm耗时 | Task以线程的方式维护对于小数据集读取能够达到亚秒级的延迟 |
spark RDD学习笔记
RDD特性:
一个分区的列表
一个计算函数compute,对每个分区进行计算
对其他RDDs的依赖(宽依赖、窄依赖)列表
对key-value RDDs来说,存在一个分区器(Partitioner)【可选的】
对每个分区有一个优先位置的列表【可选的】
常见的Transformations
| Transformation | Meaning |
| :----------------------------------------------------------- | :----------------------------------------------------------- |
| map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. func方法对RDD中的每一个元素进行处理后返回一个新的分布式的RDD结果集 |
| filter(func) | Return a new dataset formed by selecting those elements of the source on which func returns true.返回每一个func=true条件的元素,组成一个新的RDD结果集 |
| flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).此处func返回的是一个seq,将每一个元素映射成0-n个元素的seq最后组成一个新的RDD结果集 |
| mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.和map方法一样,区别是func按照分区处理 map的fun按照每一个元素处理,内存足够的情况下此方法效率高 |
| mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. |
| sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. |
| union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. |
| intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. |
| distinct([numPartitions])) | Return a new dataset that contains the distinct elements of the source dataset. |
| groupByKey([numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey
or aggregateByKey
will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions
argument to set a different number of tasks. |
| reduceByKey(func, [numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey
, the number of reduce tasks is configurable through an optional second argument. |
| aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey
, the number of reduce tasks is configurable through an optional second argument. |
| sortByKey([ascending], [numPartitions]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending
argument. |
| join(otherDataset, [numPartitions]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin
, rightOuterJoin
, and fullOuterJoin
. |
| cogroup(otherDataset, [numPartitions]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith
. |
| cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). |
| pipe(command, [envVars]) | Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings. |
| coalesce(numPartitions) | Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. |
| repartition(numPartitions) | Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. |
| repartitionAndSortWithinPartitions(partitioner) | Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition
and then sorting within each partition because it can push the sorting down into the shuffle machinery. |
map与mapPartitions区别
map:每次处理一条数据
mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易导致OOM
最佳实践:当内存资源充足时,建议使用mapPartitions,以提高处理效率
集合的几个排序的区别
sorted 排序 升序、sortWith 传入返回boolean类型的函数、sortBy 对元素处理后排序
println(s"sorted = ${l.sorted} sortwith = ${l.sortWith(_<_)} sortby = ${l.sortBy(_+1)}")
RDD之序列化
在实际开发中会自定义一些对RDD的操作,此时需要注意的是:
初始化工作是在Driver端进行的
实际运行程序是在Executor端进行的
这就涉及到了进程通信,是需要序列化的。
可以简单的认为SparkContext代表Driver。
object SerializableDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val rdd1 = sc.makeRDD(1 to 20)
def add1(x: Int) = x + 100
val add2 = add1 _
val my1 = new MyClass1(20)
val my2 = new MyClass2(20)
val my3 = new MyClass3(20)
//rdd1.map(m => m+my3.num).foreach(println) //此方式会报错 Caused by: java.io.NotSerializableException: MyClass3
rdd1.map(m => m+my2.num).foreach(println)
}
}
//解决方式1
case class MyClass1(x: Int){
val num = x
}
//解决方法2
class MyClass2(x: Int) extends Serializable {
val num = x
}
spark之正则表达式
spark SQL
优势:
代码表达简单、sparksql的表数据在内存中列存储不使用原生态的jvm对象存储
spark SQL 提供了两个新的抽象
1)DataFrame:与RDD类似DataFrame也是一个分布式数据集
2)DataSet
官方函数说明:http://spark.apache.org/docs/latest/api/sql/index.html#power
使用spark SQL实现KNN算法
准备数据集并将如下数据集分为两份--- 1)为样本 2)在样本中的每个类型剪切出来3-5条用作后面计算验证
Id,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species
1,5.1,3.5,1.4,0.2,Iris-setosa
2,4.9,3,1.4,0.2,Iris-setosa
3,4.7,3.2,1.3,0.2,Iris-setosa
4,4.6,3.1,1.5,0.2,Iris-setosa
5,5,3.6,1.4,0.2,Iris-setosa
6,5.4,3.9,1.7,0.4,Iris-setosa
7,4.6,3.4,1.4,0.3,Iris-setosa
8,5,3.4,1.5,0.2,Iris-setosa
9,4.4,2.9,1.4,0.2,Iris-setosa
10,4.9,3.1,1.5,0.1,Iris-setosa
11,5.4,3.7,1.5,0.2,Iris-setosa
12,4.8,3.4,1.6,0.2,Iris-setosa
13,4.8,3,1.4,0.1,Iris-setosa
14,4.3,3,1.1,0.1,Iris-setosa
15,5.8,4,1.2,0.2,Iris-setosa
16,5.7,4.4,1.5,0.4,Iris-setosa
17,5.4,3.9,1.3,0.4,Iris-setosa
18,5.1,3.5,1.4,0.3,Iris-setosa
19,5.7,3.8,1.7,0.3,Iris-setosa
20,5.1,3.8,1.5,0.3,Iris-setosa
21,5.4,3.4,1.7,0.2,Iris-setosa
22,5.1,3.7,1.5,0.4,Iris-setosa
23,4.6,3.6,1,0.2,Iris-setosa
24,5.1,3.3,1.7,0.5,Iris-setosa
25,4.8,3.4,1.9,0.2,Iris-setosa
26,5,3,1.6,0.2,Iris-setosa
27,5,3.4,1.6,0.4,Iris-setosa
28,5.2,3.5,1.5,0.2,Iris-setosa
29,5.2,3.4,1.4,0.2,Iris-setosa
30,4.7,3.2,1.6,0.2,Iris-setosa
31,4.8,3.1,1.6,0.2,Iris-setosa
32,5.4,3.4,1.5,0.4,Iris-setosa
33,5.2,4.1,1.5,0.1,Iris-setosa
34,5.5,4.2,1.4,0.2,Iris-setosa
35,4.9,3.1,1.5,0.1,Iris-setosa
36,5,3.2,1.2,0.2,Iris-setosa
37,5.5,3.5,1.3,0.2,Iris-setosa
38,4.9,3.1,1.5,0.1,Iris-setosa
39,4.4,3,1.3,0.2,Iris-setosa
40,5.1,3.4,1.5,0.2,Iris-setosa
41,5,3.5,1.3,0.3,Iris-setosa
42,4.5,2.3,1.3,0.3,Iris-setosa
43,4.4,3.2,1.3,0.2,Iris-setosa
44,5,3.5,1.6,0.6,Iris-setosa
51,7,3.2,4.7,1.4,Iris-versicolor
52,6.4,3.2,4.5,1.5,Iris-versicolor
53,6.9,3.1,4.9,1.5,Iris-versicolor
54,5.5,2.3,4,1.3,Iris-versicolor
55,6.5,2.8,4.6,1.5,Iris-versicolor
56,5.7,2.8,4.5,1.3,Iris-versicolor
57,6.3,3.3,4.7,1.6,Iris-versicolor
58,4.9,2.4,3.3,1,Iris-versicolor
59,6.6,2.9,4.6,1.3,Iris-versicolor
60,5.2,2.7,3.9,1.4,Iris-versicolor
61,5,2,3.5,1,Iris-versicolor
62,5.9,3,4.2,1.5,Iris-versicolor
63,6,2.2,4,1,Iris-versicolor
64,6.1,2.9,4.7,1.4,Iris-versicolor
65,5.6,2.9,3.6,1.3,Iris-versicolor
66,6.7,3.1,4.4,1.4,Iris-versicolor
67,5.6,3,4.5,1.5,Iris-versicolor
68,5.8,2.7,4.1,1,Iris-versicolor
69,6.2,2.2,4.5,1.5,Iris-versicolor
70,5.6,2.5,3.9,1.1,Iris-versicolor
71,5.9,3.2,4.8,1.8,Iris-versicolor
72,6.1,2.8,4,1.3,Iris-versicolor
73,6.3,2.5,4.9,1.5,Iris-versicolor
74,6.1,2.8,4.7,1.2,Iris-versicolor
75,6.4,2.9,4.3,1.3,Iris-versicolor
76,6.6,3,4.4,1.4,Iris-versicolor
77,6.8,2.8,4.8,1.4,Iris-versicolor
78,6.7,3,5,1.7,Iris-versicolor
79,6,2.9,4.5,1.5,Iris-versicolor
80,5.7,2.6,3.5,1,Iris-versicolor
81,5.5,2.4,3.8,1.1,Iris-versicolor
82,5.5,2.4,3.7,1,Iris-versicolor
83,5.8,2.7,3.9,1.2,Iris-versicolor
84,6,2.7,5.1,1.6,Iris-versicolor
85,5.4,3,4.5,1.5,Iris-versicolor
86,6,3.4,4.5,1.6,Iris-versicolor
87,6.7,3.1,4.7,1.5,Iris-versicolor
88,6.3,2.3,4.4,1.3,Iris-versicolor
89,5.6,3,4.1,1.3,Iris-versicolor
90,5.5,2.5,4,1.3,Iris-versicolor
91,5.5,2.6,4.4,1.2,Iris-versicolor
92,6.1,3,4.6,1.4,Iris-versicolor
93,5.8,2.6,4,1.2,Iris-versicolor
94,5,2.3,3.3,1,Iris-versicolor
95,5.6,2.7,4.2,1.3,Iris-versicolor
96,5.7,3,4.2,1.2,Iris-versicolor
97,5.7,2.9,4.2,1.3,Iris-versicolor
101,6.3,3.3,6,2.5,Iris-virginica
102,5.8,2.7,5.1,1.9,Iris-virginica
103,7.1,3,5.9,2.1,Iris-virginica
104,6.3,2.9,5.6,1.8,Iris-virginica
105,6.5,3,5.8,2.2,Iris-virginica
106,7.6,3,6.6,2.1,Iris-virginica
107,4.9,2.5,4.5,1.7,Iris-virginica
108,7.3,2.9,6.3,1.8,Iris-virginica
109,6.7,2.5,5.8,1.8,Iris-virginica
110,7.2,3.6,6.1,2.5,Iris-virginica
111,6.5,3.2,5.1,2,Iris-virginica
112,6.4,2.7,5.3,1.9,Iris-virginica
113,6.8,3,5.5,2.1,Iris-virginica
114,5.7,2.5,5,2,Iris-virginica
115,5.8,2.8,5.1,2.4,Iris-virginica
116,6.4,3.2,5.3,2.3,Iris-virginica
117,6.5,3,5.5,1.8,Iris-virginica
118,7.7,3.8,6.7,2.2,Iris-virginica
119,7.7,2.6,6.9,2.3,Iris-virginica
120,6,2.2,5,1.5,Iris-virginica
121,6.9,3.2,5.7,2.3,Iris-virginica
122,5.6,2.8,4.9,2,Iris-virginica
123,7.7,2.8,6.7,2,Iris-virginica
124,6.3,2.7,4.9,1.8,Iris-virginica
125,6.7,3.3,5.7,2.1,Iris-virginica
126,7.2,3.2,6,1.8,Iris-virginica
127,6.2,2.8,4.8,1.8,Iris-virginica
128,6.1,3,4.9,1.8,Iris-virginica
129,6.4,2.8,5.6,2.1,Iris-virginica
130,7.2,3,5.8,1.6,Iris-virginica
131,7.4,2.8,6.1,1.9,Iris-virginica
132,7.9,3.8,6.4,2,Iris-virginica
133,6.4,2.8,5.6,2.2,Iris-virginica
134,6.3,2.8,5.1,1.5,Iris-virginica
135,6.1,2.6,5.6,1.4,Iris-virginica
136,7.7,3,6.1,2.3,Iris-virginica
137,6.3,3.4,5.6,2.4,Iris-virginica
138,6.4,3.1,5.5,1.8,Iris-virginica
139,6,3,4.8,1.8,Iris-virginica
140,6.9,3.1,5.4,2.1,Iris-virginica
141,6.7,3.1,5.6,2.4,Iris-virginica
142,6.9,3.1,5.1,2.3,Iris-virginica
143,5.8,2.7,5.1,1.9,Iris-virginica
144,6.8,3.2,5.9,2.3,Iris-virginica
KNN算法说明:
1)准备数据,对数据进行预处理
2)计算测试样本点(也就是待分类点如图黑色方块)到其他每个样本点(圆形/五角星)的距离
3)对每个距离进行排序,然后选择出距离最小的K个点(黑色圆内的)
4)对K个点所属的类别进行比较,根据少数服从多数的原则,将测试样本点归入在K个点中占比最高的那一类
即如下四个五角星三个圆那么黑色方块属于五角星类型
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.{DataFrame, SparkSession}
object lrisCateFind {
def main(args: Array[String]): Unit = {
//创建sparksession
val spark = SparkSession
.builder()
.appName(this.getClass.getCanonicalName.init)
.master("local[*]")
.getOrCreate()
import spark.implicits._
//创建shcema指定csv数据读取的字段类型
val shcema = "Id Int,SepalLengthCm float,SepalWidthCm float,PetalLengthCm float,PetalWidthCm float,Species String"
val shcemat = "tId Int,tSepalLengthCm float,tSepalWidthCm float,tPetalLengthCm float,tPetalWidthCm float,tSpecies String"
//读取样本的csv文件
val sample: DataFrame = spark.read
.option("header","true")
.schema(shcema)
.csv("src/data/Iris.csv")
//读取测试点的csv文件
val testPoint = spark.read
.option("header","true")
.schema(shcemat)
.csv("src/data/lris_sub.csv")
sample.crossJoin(broadcast(testPoint))
.selectExpr("sqrt((power((SepalLengthCm-tSepalLengthCm),2)+power((SepalWidthCm-tSepalWidthCm),2)+power((PetalLengthCm-tPetalLengthCm),2)+power((PetalWidthCm-tPetalWidthCm),2))) as distence"
,"tId","tSpecies","Species").createOrReplaceTempView("crossResult")
spark.sql(
"""select *,first_value(Species) over(partition by tId order by num desc) n
| from
| (select tId,max(tSpecies) as tSpecies,Species,count(1) as num
| from
| (select *,row_number() over(partition by a.tId order by distence) as b
| from crossResult a)
| where b <= 9
| group by tId,Species)
|""".stripMargin).show()
}
}