1.Spark使用parquet文件存储格式能带来哪些好处?
使用 parquet 主要是对 Spark SQL 查询进行优化,parquet使用列存储,列存储相对于 行存储有下列优点:
数据即索引,查询是可以跳过不符合条件的数据,只读取需要的数据,降低 IO 数据量(行存储没有索引查询时造成大量 IO,建立索引和物化视图代价较大)
只读取需要的列,进一步降低 IO 数据量,加速扫描性能(行存储会扫描所有列)
由于同一列的数据类型是一样的,可以使用高效的压缩编码来节约存储空间
速度更快、极大的减少磁盘I/o、压缩技术非常稳定出色
2.如何理解 Spark Broadcast?
Spark 根据 shuffle 类的算子 划分 Stage(阶段),分为 ShuffleMapStage 和 ResultStage(ResultStage 包含了最后一个 action 操作的算子),这两个 Stage 又分别对应 ShuffleMapTask 和 ResultTask。
在一个 Task 上运行一个函数时,会把函数中涉及到的每个变量,在当前任务上都生成一个副本。
如果 在这个 Task 通过函数产生的变量 比如是个数据表,大小是 1G,其他 Task 也需要用到这个 数据表进行查询等操作,那么如果有 100 个这样的 Task 都用到这个 数据表,就会复制 100 份数据,这样极大的耗费了 网络 IO 和 Exeutor 的内存。
通过 Broadcast,可以把这个 1G 的数据表 拷贝到 Executor,如果恰好 这个 Executor 包含这 100 个 Task,那么这100个Task 只需共用这一份副本即可。
进行传播的变量,会经过序列化,然后在被任务使用时再进行反序列化。
注意:Broadcast 的数据 必须是 只读的。
例子:
可以通过调用 SparkContext.broadcast(v) 来从一个普通变量v中创建一个广播变量。
val v = Array(1, 2, 3)
val broadcastVar = sc.broadcast(v)
broadcastVar.value
//value值为:
res0: Array[Int] = Array(1, 2, 3)
这个广播变量被创建以后,就被广播到 其他 Executor 上。
使用场景:
即上面提到的场景,很多Task 都使用一个变量的情况
join优化,大表join小表的情况,将小表broadcast到executor,可以使用SparkContext进行操作,Spark 1.5版本起DataFrame提供了语法上的支持,如下:
import org.apache.spark.sql.functions.broadcast
// left and right are DataFrames
left.join(broadcast(right), "joinKey")
此外,Spark内部使用Broadcast传输conf到executor。
参考:https://blog.csdn.net/u011564172/article/details/75088331
3.Spark累加器有哪些特点?
1)累加器在全局唯一的,只增不减,记录全局集群的唯一状态
2)在 executor 中修改它,在driver读取,由于累加器的值最终要汇聚到driver端,为了避免 driver端的OOM问题,累加值不应过大
3)是 executor级别 的共享,广播变量是 task 级别的共享
两个application不可以共享累加器,但是同一个app不同的job可以共享
用途:
累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。
如果创建累加器时指定了名字,则可以在Spark UI界面看到,这有利于理解每个执行阶段的进程。
例子:
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value //值为:10
可通过继承 AccumulatorV2 实现自定义累加器
5.spark hashParitioner的弊端是什么?
答:HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,最后返回的值就是这个key所属的分区ID;
弊端是数据不均匀,容易导致数据倾斜,极端情况下某几个分区会拥有rdd的所有数据
6.RangePartitioner分区的原理?
答:RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。其原理是水塘抽样。可以参考这篇博文
https://www.iteblog.com/archives/1522.html
7.介绍parition和block有什么关联关系?
答:1)hdfs中的block是分布式存储的最小单元,等分,可设置冗余,这样设计有一部分磁盘空间的浪费,但是整齐的block大小,便于快速找到、读取对应的内容;2)Spark中的partion是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partion组成的。partion是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partion大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定;3)block位于存储空间、partion位于计算空间,block的大小是固定的、partion大小是不固定的,是从2个不同的角度去看数据。
8.Spark应用程序的执行过程是什么?
1)构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
2).资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
3).SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。
4).Task在Executor上运行,运行完毕释放所有资源。
9.hbase预分区个数和spark过程中的reduce个数相同么
答:和spark的map个数相同,reduce个数如果没有设置和reduce前的map数相同。
10.如何理解Standalone模式下,Spark资源分配是粗粒度的?
答:spark默认情况下资源分配是粗粒度的,也就是说程序在提交时就分配好资源,后面执行的时候
使用分配好的资源,除非资源出现了故障才会重新分配。比如Spark shell启动,已提交,一注册,哪怕没有任务,worker都会分配资源给executor。
11.Spark如何自定义partitioner分区器?
答:1)spark默认实现了HashPartitioner和RangePartitioner两种分区策略,我们也可以自己扩展分区策略,自定义分区器的时候继承org.apache.spark.Partitioner类,实现类中的三个方法
def numPartitions: Int:这个方法需要返回你想要创建分区的个数;
def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1;
equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。
2)使用,调用parttionBy方法中传入自定义分区对象
参考:http://blog.csdn.net/high2011/article/details/68491115
12.spark中task有几种类型?
答:2种类型:1)result task类型,最后一个task,2是shuffleMapTask类型,除了最后一个task都是
13.union操作是产生宽依赖还是窄依赖?
答:窄依赖
14.rangePartioner分区器特点?
答:rangePartioner尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。RangePartitioner作用:将一定范围内的数映射到某一个分区内,在实现中,分界的算法尤为重要。算法对应的函数是rangeBounds
15.什么是二次排序,你是如何用spark实现二次排序的?(互联网公司常面)
答:就是考虑2个维度的排序,先对key排序,然后再对key的 value 列表进行排序。参考博文:http://blog.csdn.net/sundujing/article/details/51399606
https://www.iteblog.com/archives/1819.html
16.如何使用Spark解决TopN问题?(互联网公司常面)
答:主要使用 sortByKey 进行排序,然后 take(N) 即可。常见的面试题,参考博文:http://www.cnblogs.com/yurunmiao/p/4898672.html(这篇不好)
17.如何使用Spark解决分组排序问题?(互联网公司常面)
组织数据形式:
aa 11
bb 11
cc 34
aa 22
bb 67
cc 29
aa 36
bb 33
cc 30
aa 42
bb 44
cc 49
需求:
1、对上述数据按key值进行分组
2、对分组后的值进行排序
3、截取分组后值得top 3位以key-value形式返回结果
答案:如下
val groupTopNRdd = sc.textFile("hdfs://db02:8020/user/hadoop/groupsorttop/groupsorttop.data")
groupTopNRdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey().map(
x => {
val xx = x._1
val yy = x._2
(xx, yy.toList.sorted.reverse.take(3))
}
).collect
scala 排序?
(1)sorted:对一个集合进行自然排序,通过传递隐式的Ordering
(2)sortBy:对一个属性或多个属性进行排序,通过它的类型。
(3)sortWith:基于函数的排序,通过一个comparator函数,实现自定义排序的逻辑。
val xs=Seq(1,5,3,4,6,2)
println("==============sorted排序=================")
println(xs.sorted) //升序
println(xs.sorted.reverse) //降序
println("==============sortBy排序=================")
println( xs.sortBy(d=>d) ) //升序
println( xs.sortBy(d=>d).reverse ) //降序
println("==============sortWith排序=================")
println( xs.sortWith(_<_) )//升序
println( xs.sortWith(_>_) )//降序
18.窄依赖 父RDD 的 partition 和 子RDD 的 parition 是不是都是一对一的关系?
答:不一定,除了 一 对 一 的窄依赖,还包含 一 对 固定个数 的窄依赖(就是对父RDD的依赖的Partition的数量不会随着RDD数量规模的改变而改变),比如join操作的每个partiion仅仅和已知的partition进行join,这个join操作是窄依赖,依赖固定数量的父rdd,因为是确定的partition关系
19.Hadoop中,Mapreduce操作的mapper和reducer阶段相当于spark中的哪几个算子?
答:相当于spark中的map算子和reduceByKey算子,当然还是有点区别的,MR会自动进行排序的,spark要看你用的是什么partitioner
20.什么是shuffle,以及为什么需要shuffle?
shuffle中文翻译为洗牌,需要shuffle的原因是:某种具有共同特征的数据汇聚到一个计算节点上进行计算
21.不需要排序的hash shuffle是否一定比需要排序的sort shuffle速度快?
答:不一定!!当数据规模小,Hash shuffle快于Sorted Shuffle数据规模大的时候;当数据量大,sorted Shuffle会比Hash shuffle快很多,因为数量大的有很多小文件,不均匀,甚至出现数据倾斜(如果有一个key 数据特别大,sorted Shuffle 是按顺序读取数据,达到一定阈值后,会把内存中的数据先排序,然后溢写到磁盘,然后对每个文件进行合并,所以数据时均匀分配的),消耗内存大,1.x之前spark使用hash,适合处理中小规模,1.x之后,增加了Sorted shuffle,Spark更能胜任大规模处理了。
22.Spark中的HashShufle的有哪些不足?
答:1)shuffle产生海量的小文件在磁盘上,此时会产生大量耗时的、低效的IO操作;2).容易导致内存不够用,由于内存需要保存海量的文件操作句柄和临时缓存信息,如果数据处理规模比较大的化,容易出现OOM;3)容易出现数据倾斜,导致OOM
23.conslidate是如何优化Hash shuffle时在map端产生的小文件?
答:1)conslidate为了解决Hash Shuffle同时打开过多文件导致Writer handler内存使用过大以及产生过多文件导致大量的随机读写带来的低效磁盘IO;2)conslidate根据CPU的个数来决定每个task shuffle map端产生多少个文件,假设原来有10个task,100个reduce,每个CPU有10个CPU
那么使用hash shuffle会产生10100=1000个文件,conslidate产生1010=100个文件
备注:conslidate部分减少了文件和文件句柄,并行读很高的情况下(task很多时)还是会很多文件
24.Sort-basesd shuffle产生多少个临时文件
答:2*Map阶段所有的task数量,Mapper阶段中并行的Partition的总数量,其实就是Mapper端task
25.Sort-based shuffle的缺陷?
- 如果mapper中task的数量过大,依旧会产生很多小文件,此时在shuffle传递数据的过程中reducer段,reduce会需要同时大量的记录进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃
2)如果需要在分片内也进行排序,此时需要进行mapper段和reducer段的两次排序
26.Spark shell启动时会启动derby?
答: spark shell启动会启动spark sql,spark sql默认使用derby保存元数据,但是尽量不要用derby,它是单实例,不利于开发。会在本地生成一个文件metastore_db,如果启动报错,就把那个文件给删了 ,derby数据库是单实例,不能支持多个用户同时操作,尽量避免使用
27.spark.default.parallelism这个参数有什么意义,实际生产中如何设置?
答:1)参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能;2)很多人都不会设置这个参数,会使得集群非常低效,你的cpu,内存再多,如果task始终为1,那也是浪费,spark官网建议task个数为CPU的核数*executor的个数的2~3倍。
28.spark.storage.memoryFraction参数的含义,实际生产中如何调优?
答:1)用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6,,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。2)如果持久化操作比较多,可以提高spark.storage.memoryFraction参数,使得更多的持久化数据保存在内存中,提高数据的读取性能,如果shuffle的操作比较多,有很多的数据读写操作到JVM中,那么应该调小一点,节约出更多的内存给JVM,避免过多的JVM gc发生。在web ui中观察如果发现gc时间很长,可以设置spark.storage.memoryFraction更小一点。
29.spark.shuffle.memoryFraction参数的含义,以及优化经验?
答:1)spark.shuffle.memoryFraction是shuffle调优中 重要参数,shuffle从上一个task拉去数据过来,要在Executor进行聚合操作,聚合操作时使用Executor内存的比例由该参数决定,默认是20%
如果聚合时数据超过了该大小,那么就会spill到磁盘,极大降低性能;2)如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值
30.介绍一下你对Unified Memory Management内存管理模型的理解?
答:Spark中的内存使用分为两部分:执行(execution)与存储(storage)。执行内存主要用于shuffles、joins、sorts和aggregations,存储内存则用于缓存或者跨节点的内部数据传输。1.6之前,对于一个Executor,内存都有哪些部分构成:
1)ExecutionMemory。这片内存区域是为了解决 shuffles,joins, sorts and aggregations 过程中为了避免频繁IO需要的buffer。 通过spark.shuffle.memoryFraction(默认 0.2) 配置。
2)StorageMemory。这片内存区域是为了解决 block cache(就是你显示调用dd.cache, rdd.persist等方法), 还有就是broadcasts,以及task results的存储。可以通过参数 spark.storage.memoryFraction(默认0.6)。设置
3)OtherMemory。给系统预留的,因为程序本身运行也是需要内存的。 (默认为0.2).
传统内存管理的不足:
1).Shuffle占用内存0.2*0.8,内存分配这么少,可能会将数据spill到磁盘,频繁的磁盘IO是很大的负担,Storage内存占用0.6,主要是为了迭代处理。传统的Spark内存分配对操作人的要求非常高。(Shuffle分配内存:ShuffleMemoryManager, TaskMemoryManager,ExecutorMemoryManager)一个Task获得全部的Execution的Memory,其他Task过来就没有内存了,只能等待。
2).默认情况下,Task在线程中可能会占满整个内存,分片数据特别大的情况下就会出现这种情况,其他Task没有内存了,剩下的cores就空闲了,这是巨大的浪费。这也是人为操作的不当造成的。
3).MEMORY_AND_DISK_SER的storage方式,获得RDD的数据是一条条获取,iterator的方式。如果内存不够(spark.storage.unrollFraction),unroll的读取数据过程,就是看内存是否足够,如果足够,就下一条。unroll的space是从Storage的内存空间中获得的。unroll的方式失败,就会直接放磁盘。
4). 默认情况下,Task在spill到磁盘之前,会将部分数据存放到内存上,如果获取不到内存,就不会执行。永无止境的等待,消耗CPU和内存。
在此基础上,Spark提出了UnifiedMemoryManager,不再分ExecutionMemory和Storage Memory,实际上还是分的,只不过是Execution Memory访问Storage Memory,Storage Memory也可以访问Execution Memory,如果内存不够,就会去借。