一、RDD概念
RDD(Resilient Distributed Dataset):弹性分布式数据集,Spark计算的基石,为用户屏蔽了底层对数据的复杂抽象和处理,为用户提供了一组方便的数据转换与求值方法。
1、RDD是不可变的,如果需要在一个RDD上进行转换操作,则会生成一个新的RDD
2、RDD是分区的,RDD里面的具体数据是分布在多台机器上的Executor里面的。堆内内存和堆外内存+磁盘。
3、RDD是弹性的:
a、存储:Spark会根据用户的配置或者当前Spark的应用运行情况去自动将RDD的数据缓存到内存或者磁盘。它是一个对用户不可见的封装的功能。
b、容错:当你的RDD数据被删除或者丢失的时候,可以通过血统或者检查点机制恢复数据,这个对用户透明的。
c、计算:计算是分层的,有应用->Job->Stage->TaskSet->Task每一层都有对应的计算的保障与重复机制,保障你的计算不会由于一些突发因素而终止。
d、分片:可以根据业务需求或者一些算子来重新调整RDD中的数据分布。
其中Spark Core就是在操作RDD
RDD的创建->RDD的转换->RDD的缓存->RDD的行动->RDD的输出
二、RDD的创建
创建RDD有三种方式:
1、可以从一个Scala集合里面创建:
sc.parallelize(seq):把seq这个数据并行化分片到节点
sc.makeRDD(seq):把seq这个数据并行化分片到节点,它的实现就是parallelize
sc.makeRDD(seq[(T,seq)]):这种方式可以指定RDD的存放位置
2、从外部存储来创建,比如sc.textFile("path")
3、从另外一个RDD转换过来
三、RDD的操作
RDD中操作分为两大类型:转换(transformation)和行动(action)
转换:通过操作将RDD转换成另外一个RDD
行动:将一个RDD进行求值或者输出
所有这些操作主要针对两种类型的RDD:数值RDD和键值对RDD
RDD的所有转换操作都是懒执行的,只有当行动操作出现的时候spark才会真的去运行
常见的转换操作:
1、def map[U: ClassTag](f: T => U): RDD[U] 将函数应用于RDD的每一元素,并返回一个新的RDD
2、def filter(f: T => Boolean): RDD[T] 通过提供的产生boolean条件的表达式来返回符合结果为True的新的RDD
3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。
4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。
5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning:Boolean = false): RDD[U] 将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。
6、def sample(withReplacement:Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。
7、def union(other: RDD[T]): RDD[T] 将两个RDD中的元素进行合并,返回一个新的RDD
8、def intersection(other:RDD[T]): RDD[T] 将两个RDD做交集,返回一个新的RDD
9、def distinct(): RDD[T] 将当前RDD进行去重后,返回一个新的RDD
10、def partitionBy(partitioner:Partitioner): RDD[(K, V)] 根据设置的分区器重新将RDD进行分区,返回新的RDD。
11、def reduceByKey(func: (V, V) => V): RDD[(K, V)] 根据Key值将相同Key的元组的值用func进行计算,返回新的RDD
12、def groupByKey(): RDD[(K, Iterable[V])] 将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD
13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] 根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。
14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner:Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] 通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。
15、def foldByKey(zeroValue:V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] aggregateByKey的简化操作,seqop和combop相同,
16、def sortByKey(ascending:Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
17、def sortBy[K](f: (T) => K,ascending: Boolean =true,numPartitions: Int =this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 底层实现还是使用sortByKey,只不过使用fun生成的新key进行排序。
18、def join[W](other: RDD[(K, W)], partitioner: Partitioner):RDD[(K, (V, W))] 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD,但是需要注意的是,他只会返回key在两个RDD中都存在的情况。
19、def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner):RDD[(K, (Iterable[V], Iterable[W]))] 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD,注意,如果V和W的类型相同,也不放在一块,还是单独存放。
20、def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] 做两个RDD的笛卡尔积,返回对偶的RDD
21、def pipe(command: String): RDD[String] 对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD,注意,如果你是本地文件系统中,需要将脚本放置到每个节点上。
22、def coalesce(numPartitions:Int, shuffle: Boolean = false,partitionCoalescer:Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null) : RDD[T] 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
23、def repartition(numPartitions:Int)(implicit ord: Ordering[T] = null): RDD[T] 根据你传入的分区数重新通过网络分区所有数据,重型操作。
24、def repartitionAndSortWithinPartitions(partitioner:Partitioner): RDD[(K, V)] 性能要比repartition要高。在给定的partitioner内部进行排序
25、def glom(): RDD[Array[T]] 将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
26、def mapValues[U](f: V => U): RDD[(K, U)] 将函数应用于(k,v)结果中的v,返回新的RDD
27、def subtract(other: RDD[T]): RDD[T] 计算差的一种函数去除两个RDD中相同的元素,不同的RDD将保留下来。
常见的行动操作:
1、def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] 抽样但是返回一个scala集合。
2、def reduce(f: (T, T) => T): T 通过func函数聚集RDD中的所有元素
3、def collect(): Array[T] 在驱动程序中,以数组的形式返回数据集的所有元素
4、def count():Long 返回RDD中的元素个数
5、def first():T 返回RDD中的第一个元素
6、def take(num: Int): Array[T] 返回RDD中的前n个元素
7、def takeOrdered(num:Int)(implicit ord: Ordering[T]) 返回前几个的排序
8、def aggregate[U: ClassTag](zeroValue:U)(seqOp: (U,T) =>U, combOp: (U,U) =>U):U aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
9、def fold(zeroValue:T)(op: (T,T) =>T):T 折叠操作,aggregate的简化操作,seqop和combop一样。
10、def saveAsTextFile(path:String):Unit 将RDD以文本文件的方式保存到本地或者HDFS中
11、def saveAsObjectFile(path: String):Unit 将RDD中的元素以序列化后对象形式保存到本地或者HDFS中。
12、def countByKey(): Map[K, Long] 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
13、def foreach(f: T =>Unit):Unit 在数据集的每一个元素上,运行函数func进行更新。
注意:当你在RDD中使用到了class的方法或者属性的时候,该class需要继承java.io.Serializable接口,或者可以将属性赋值为本地变量来防止整个对象的传输。
四、Spark应用提交与调试
1、进入到spark安装目录的bin,调用Spark-submit脚本
2、在脚本后面传入参数(部分)
--class:应用的启动类
--master:集群master的URL(应用运行的模式)
--deploy-mode:是否发布你的驱动到worker节点(cluster)或者作为一个本地客户端(client)(default:client)(提示:client:在master上面新建一个jvm(driver运行其中负责调度回收结果等功能),在worker中申请executor,是jvm独有的;cluster:在其中一个worker上申请excutor和jvm,在其他的worker申请的executor)
--conf:任意的spark配置属性,格式key=vlaue,如果值包含空格,可以加引号“key=value”
application-jar:打包好的应用jar,包含依赖,这个URL在集群中全局可见。比如hdfs://共享存储系统,如果是file://path,那么所有节点的path都包含同样的jar
application-arguments:传给main()方法的参数
五、Spark调试
1、本地调试
【以单节点的方式来运行整个Spark应用】
a、写好你的程序
b、将master设置为local或者local[n]
c、如果你用到了HDFS,可能会遇到winuntils找不到的问题,需要将HADOOP_HOME环境变量加入到IDEA中。
d、打断点
e、Debug模式运行你的程序
2、远程调试
【把IDEA当做你的Driver来运行,保持和整个Spark集群的连接关系】
前提:你的本机和Spark集群是在同一网段。
a、写好你的程序
b、修改Master地址为你的Spark集群的地址
c、将最终需要运行的jar包加入到setJars方法中
d、设置你的本地地址到spark.driver.host 这个变量里面
e、打断点
f、Debug模式运行你的程序