spark 为什么要集成 gluten
- 随着 spark 版本的迭代,spark 的 benchmark 的提升逐渐趋于平缓
- 随着硬件技术的发展,网络磁盘都有较大的提升但是 cpu 并没有很大的突破,cpu 可能会成为瓶颈
- 在 java 中 cpu 的计算集中度不够高(cpu 用来纯计算的时间占比较少),没有充分利用 cpu 的并行计算能力
- java 直接操作更高级的可并行化处理数据的 cpu 指令集困难
cpu 数据并行处理 & 向量化
并行处理
- 举例说明:
- 假如要对两个数组的相同下标的元素相加
- 不管寄存器有多长对于两个整型数据相加他的长度只会占用前面的部分 bit (32个bit) 浪费了后面的bit 长度所以可以进行优化
- 将多个同质的指令计算需求压到同一个寄存器里面用一组指令来完成
向量化计算的好处
- 感觉向量化的前提就是列计算。需要把同质化的计算放一块
- 减少虚函数的调用(java 面向接口编程,在计算时需要找到真正的实现方法),使得 cpu 的时间能够更加专注在 计算这一块
- 比如 spark sql 都会通过 codegen 的方式去将 sql 逻辑代码化,而 codegen 出来的代码就是一个个不一样的 class. 没有继承多态等。这可能就是为啥要 codegen 而不再抽象公共类的原因吧
- 能够用上 cpu SIMD 指令集等技术对数据进行并行处理
- 因为输入是列数据所以会比较集中,cpu cache 的数据可能都是要计算的列的数据,相比行的数据,空间更节省,所以可以 cache 更多的列数据。cache 的数据可能就是下一步计算需要的,所以能够提供 cpu 的缓存命中率。
note: 由于对硬件了解不多,理解可能也不对
gluten 与 spark 结合的简单实现
- 将 Spark 物理计划转换为 Substrate plain,然后 Substrait plain 通过JNI调用传递给 native。
- Substrait 是一种可以跨平台将一种 plan 转化成另一种 plan 的通用转化格式。(gluten 作为 spark 和 native 向量化执行引擎的中间层,为了方便拓展不同的 native 向量化执行引擎所以采用了 Substrait)
- 在 native 层,构建 native 算子操作链然后在 native 引擎里面去执行。
- native 层与 spark 数据交互以 spark ColumnarBatch 的形式进行交互
- 交互的数据格式可能会涉及到 apache arrow
- arrow 是一种通用的数据共享的数据结构,它再内存中的格式就是序列化后的格式,传输过程不需要序列化和反序列化
-
由于 native 引擎对数据格式,udf,算子等的不支持导致 stage 没办法在 native 中执行。而一个 spark sql 作业中会根据宽依赖划分有不同的 stage, 所以stage 间的 shuffle write/shuffle read 会遇到如下情况,针对这些场景是怎么实现的呢?
spark 集成 gluten 的 shuffle 实现
看 gluten shuffleManager 之前先看基于 spark 框架 shuffleManager 的主要作用
spark shuffleManager
- org.apache.spark.shuffle.ShuffleManager 是为 shuffle systems 提供的一种插件试的接口
- 接口核心有 3 个方法
registerShuffle、getWriter、getReader - 在构建 ShuffleDependency 时会调用 shuffle manager 的注册方法,注册返回一个 ShuffleHandle 之后 ShuffleDependency 会持有 ShuffleHandle 的属性
- 在 shuffler write 阶段会调用 manager.getWriter 并且会把这个 ShuffleHandle 句柄加进去去创建 shuffler write
shuffler write 会返回一个 MapStatus ,这个在 spark3 的 AQE 里面会用到去动态优化 stage 的执行计划 - 在 shuffler read 阶段会调用 manager.getReader 并且会把这个 ShuffleHandle 句柄加进去去创建 shuffler read
spark shuffler writer
- 在 spark diriver 会对 stage 转成成一组 task (ShuffleMapTask 或者 ResultTask) 给到 excutor 去执行,当 ShuffleMapTask 在 excutor 端调用起来后会执行 runTask 方法,runTask 方法在最后会 调用 shuffleDependency.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
- 其实这都只是把链路打通了一切都是 lazy 都是流式的。 这个抽象的 rdd 一直 next . 这里可能会有疑问 为什么到了 excutor 端了还他妈的是 rdd, 不应该是一个具体的数据集了吗? 没错他还传了个 partition,所以只会去计算这个 rdd 的某个分区然后狗日的一直 next 进行 shuffle 输出。
- 那这个迭代器是怎么来的呢,就是 rdd 的compute 方法构造出来的, 构造逻辑就是 shufflerReader.read() 经过某种 tramsform(比如执行 codegen 的代码)
- 总之就是从 shuffle read 里面读数据执行计算逻辑就得到了这个计算链路,链路是 lazy 的 。直到 write 调用 next 方法触发链路数据流的计算
gluten shuffleManager 实现
shuffleManager 是 spark 框架的而 native 的执行还是遵循这套框架, shuffle write /shuffe read 还是要走 spark 的框架走的 JVM 所以先实现 ShuffleManager 接口再看数据在 JVM 和 native 层的流转
- 实现 org.apache.spark.shuffle.ShuffleManager 接口 的 registerShuffle、getWriter、getReader 方法
registerShuffle:
- spark 构建ShuffleDependency 时 -> 调用 registerShuffle
- registerShuffle 方法实现会判断 ShuffleDependency 是否为 gluten 的 ColumnarShuffleDependency 如果是构建出一个 ColumnarShuffleHandle 返回,如果不是就走原生 shuffleManager 的逻辑,去创建 spark 自身的 BypassMergeSortShuffleHandle 或者 SerializedShuffleHandle 或者 BaseShuffleHandle
- ColumnarShuffleDependency 什么时候创建与spark ShuffleDependency 有什么不一样?
- ColumnarShuffleDependency 的创建大概逻辑如下
1. gluten-core: org.apache.gluten.extension.columnar.TransformExchange
1.1. 如果 gluten 开关开启会调用 BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(plan, child)
把 SparkPlan(ShuffleExchangeExec, BroadcastExchangeExec) 进行转化
2. backends-velox: org.apache.gluten.backendsapi.velox.SparkPlanExecApiImpl#genColumnarShuffleExchange
2.1 创建 ColumnarShuffleExchangeExec
2.3 ColumnarShuffleExchangeExec extends ShuffleExchangeLike with GlutenPlan
3. 什么时候创建的 ColumnarShuffleExchangeExec
gluten-core: org.apache.spark.sql.execution.ColumnarShuffleExchangeExec#columnarShuffleDependency
gluten-core: org.apache.spark.sql.execution.ColumnarShuffleExchangeExec#prepareShuffleDependency
backends-velox: org.apache.gluten.backendsapi.velox.SparkPlanExecApiImpl#genShuffleDependency
gluten-data: org.apache.spark.sql.execution.utils.ExecUtil#genShuffleDependency
创建出了 ColumnarShuffleDependency
总结就是 sparkPlan 会转化成 glutenPlan 时会创建出 ColumnarShuffleDependency
ColumnarShuffleDependency 与 spark ShuffleDependency 有什么不一样?
ColumnarShuffleDependency 针对 shuffle 数据会有特别的序列化器,只用在读取 shuffle 数据序列化器对 shuflle 数据进行反序列化。 不需要序列化因为数据是从 native 里面吐出来的,内部应该还是字节数组的形式
关于序列化器的创建和解释
在构建 shufflerDependcy 时会创建 反序列化器
1. gluten-core: org.apache.spark.sql.execution.ColumnarShuffleExchangeExec#serializer
1.1 val serializer: Serializer = BackendsApiManager.getSparkPlanExecApiInstance
.createColumnarBatchSerializer(schema, metrics)
1.2 默认场景
org.apache.gluten.vectorized.ColumnarBatchSerializer
rss 是 Celeborn 场景:
org.apache.spark.shuffle.CelebornColumnarBatchSerializer
2. org.apache.gluten.vectorized.ColumnarBatchSerializer 实现
2.1. 实际走的 ColumnarBatchSerializerInstance
2.2. 构造 ColumnarBatchSerializerInstance 时会初始化一个 Long 类型的 shuffleReaderHandle
2.3. 这个东西 shuffleReaderHandle 有点抽象,他是 JVM 层和 native 数据转化的重要媒介,native 吐给 JVM 层一般都是一个 long 的数字(怎么理解呢? 内存地址 or 偏移量?),如果需要 JVM 层可以通过这个数字调用 native 方法返回一批数据。
2.4 反序列方法 deserializeStream(in: InputStream) 输入流是 java 的输入流
也就是 shuffle read 流
2.5 反序列化具体实现:
2.5.1 private val byteIn: JniByteInputStream = JniByteInputStreams.create(in)
把 java 的数据流直接写入 native 的堆外内存
2.5.2. 然后在 JVM 层提供一个对 native 内存迭代访问的包装器
private val wrappedOut: GeneralOutIterator = new ColumnarBatchOutIterator(
Runtimes.contextInstance(),
ShuffleReaderJniWrapper
.create()
.readStream(shuffleReaderHandle, byteIn),
nmm)
2.5.3. 迭代器的 next 方法
public ColumnarBatch nextInternal() throws IOException {
// JVM 的next 实际是调用 native 的 next native 的next 返回一个 long
long batchHandle = nativeNext(iterHandle);
if (batchHandle == -1L) {
return null; // stream ended
}
// long 会映射到一块内存直接包装成 ColumnarBatches,
// 这过程不涉及到数据的拷贝读取,数据还是在堆外 和 native 层还是一块内存
// 所以针对 JVM 读取 native 数据是轻量的
return ColumnarBatches.create(runtime, batchHandle);
}
2.6. 总之如果下游 stage 能够放入 native 中执行,那么他就会通过 shuffle reader 把 JVM 的输入流通过 JNI 的方式写入native 的堆外内存,然后在 JVM 层提供一个对 native 数据的引用的轻量的迭代器进行包装
2.7. 如果 上游 stage 数据是 在 JVM 中执行计算的,那么还会有一个 行转列的过程,把行的数据转成 ColumnBatch 再喂给 native
getShufflerWriter:
- 会根据 shuffle Hander 去判断是否为 glutlen 的 ColumnarShuffleHandle, 如果是则创建 gluten 的 writer, 否则走原生的逻辑 创建 BypassMergeSortShuffleWriter 或者 UnsafeShuffleWriter 或者 SortShuffleWriter
- 与原生 writer 的不一样的地方
2.1. 最终用的是 org.apache.spark.shuffle.ColumnarShuffleWriter
2.2. shuffler write 的东西是 rdd compute 出来的数据我们看看 在gluten 里面有哪些rdd
org.apache.gluten.execution.BroadcastBuildSideRDD
org.apache.gluten.execution.VeloxBroadcastBuildSideRDD
org.apache.spark.sql.execution.VeloxColumnarWriteFilesRDD
org.apache.spark.sql.execution.ShuffledColumnarBatchRDD
2.3. 这些 rdd 返回的类型都是 ColumnBatch, 也就是 c++ 吐出来的就是 ColumnBatch
2.4. ColumnBatch 里面装的是 ColumnVector 集合 的第 0 个ColumnVector 是 IndicatorVector 之后的 ColumnVector 都是 PlaceholderVector
IndicatorVector 会有这个 ColumnBatch 的 long 类型的 Handle,还有一个 runtime (JNI相关的) 可以根据 这个 handle 和 runtime 去获取 真实的数据
这种需要通过 JNI 的方式把 native 数据 load 到 JVM 里面的场景应该是上游是在 native 中进行计算,而下游必须在 JVM 里面进行所以需要一次列转行。
2.5. 最终数据在 native 层进行数据溢写落盘之类的,对于 rss 场景他需要
在创建 shufflerWriter 时实例化一个 pusher 传入 JNI 里面,然后 JNI 会把 调用 pusher 实例把数据 突出来,写入 rss. 此时数据是二进制的。
getShufflerReader:
- 同 getShufflerWriter 一样也会去判断是否为 glutlen 的 ColumnarShuffleHandle,如果是则创建 spark 的 BlockStoreShuffleReader,如果不是 ColumnarShuffleHandle 他也是会创建 spark 的 BlockStoreShuffleReader
- 不同点是 gluten 的 shuffler Reader 会传入一个 SerializerManager。 这与 spark 原生的不一样的地方
- 他不会对输入流加密和压缩。
- 会有特有的反序列实现 这个在registerShuffle 环节已经介绍了
针对最开始的 3 个场景的总结
纯属自己的理解不一定对
- 场景1. shuffle write 是 native shuffle read 是 jvm
中间会经过一次列转行。 native 该怎写还是怎么写 - 场景2. shuffle write 是 jvm shuffle read 是 native
中间会经过一次行转列, native 改怎么读就怎读 - 场景3. shuffle write 是 native shuffle read 是 native
按照 native 的方式读写