1.Spark 中的数据倾斜问题
- 调整并行度分散同一个 Task 的不同 Key: Spark 在做 Shuffle 时,默认使用 HashPartitioner对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的 Key 对应的数据被分配到了同一个 Task 上,造成该 Task 所处理的数据远大于其它 Task,从而造成数据倾斜。如果调整 Shuffle 时的并行度,使得原本被分配到同一 Task 的不同 Key 发配到不同 Task 上处理,则可降低原 Task 所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。图中左边绿色框表示 kv 样式的数据,key 可以理解成 name。可以看到 Task0 分配了许多的 key,调整并行度,多了几个 Task,那么每个 Task 处理的数据量就分散了。
- 自定义Partitioner: 使用自定义的 Partitioner(默认为 HashPartitioner),将原本被分配到同一个 Task 的不同 Key 分配到不同 Task,可以拿上图继续想象一下,通过自定义 Partitioner 可以把原本分到 Task0 的 Key 分到 Task1,那么 Task0 的要处理的数据量就少了。
- 将 Reduce side(侧) Join 转变为 Map side(侧) Join: 通过 Spark 的 Broadcast 机制,将 Reduce 侧 Join 转化为 Map 侧 Join,避免 Shuffle 从而完全消除 Shuffle 带来的数据倾斜。可以看到 RDD2 被加载到内存中了。
- 为 skew 的 key 增加随机前/后缀: 为数据量特别大的 Key 增加随机前/后缀,使得原来 Key 相同的数据变为 Key 不相同的数据,从而使倾斜的数据集分散到不同的 Task 中,彻底解决数据倾斜问题。Join 另一则的数据中,与倾斜 Key 对应的部分数据,与随机前缀集作笛卡尔乘积,从而保证无论数据倾斜侧倾斜 Key 如何加前缀,都能与之正常 Join。
2.RDD 如何通过记录更新的方式容错
RDD 的容错机制实现分布式数据集容错方法有两种: 1. 数据检查点 2. 记录更新。
RDD 采用记录更新的方式:记录所有更新点的成本很高。所以,RDD只支持粗颗粒变换,即只记录单个块(分区)上执行的单个操作,然后创建某个 RDD 的变换序列(血统 lineage)存储下来;变换序列指,每个 RDD 都包含了它是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。因此 RDD 的容错机制又称“血统”容错。
3.Spark作业提交流程
spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGScheduler 和 TaskScheduler。
TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。
Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个 Application 启动多个 Executer。
Executor 启动后,会自己反向注册到 TaskScheduler 中。
所有 Executor 都注册到 Driver 上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。
每执行到一个 Action,就会创建一个 Job。
Job 会提交给 DAGScheduler。
DAGScheduler 会将 Job划分为多个 stage,然后每个 stage 创建一个 TaskSet。
TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。
Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。
(TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task 执行 RDD 里的一个 partition)
4.Spark的存储体系
https://blog.csdn.net/LINBE_blazers/article/details/89048435
5.Spark如何实现序列化组件
Java序列化
在默认情况下,Spark采用Java的ObjectOutputStream序列化一个对象。该方式适用于所有实现了java.io.Serializable的类。通过继承java.io.Externalizable,你能进一步控制序列化的性能。Java序列化非常灵活,但是速度较慢,在某些情况下序列化的结果也比较大。
Kryo序列化
Spark也能使用Kryo(版本2)序列化对象。Kryo不但速度极快,而且产生的结果更为紧凑(通常能提高10倍)。Kryo的缺点是不支持所有类型,为了更好的性能,你需要提前注册程序中所使用的类(class)。
Java的序列化比较简单,就和前面的一样,下面主要介绍Kryo序列化的使用。
Kryo序列化怎么用?
可以在创建SparkContext之前,通过调用System.setProperty("spark.serializer", "spark.KryoSerializer"),将序列化方式切换成Kryo。
但是Kryo需要用户进行注册,这也是为什么Kryo不能成为Spark序列化默认方式的唯一原因,但是建议对于任何“网络密集型”(network-intensive)的应用,都采用这种方式进行序列化方式。
Kryo文档描述了很多便于注册的高级选项,例如添加用户自定义的序列化代码。
如果对象非常大,你还需要增加属性spark.kryoserializer.buffer.mb的值。该属性的默认值是32,但是该属性需要足够大以便能够容纳需要序列化的最大对象。
最后,如果你不注册你的类,Kryo仍然可以工作,但是需要为了每一个对象保存其对应的全类名(full class name),这是非常浪费的