关于 spark 集成 gluten 的 shuffle 机制

spark 为什么要集成 gluten

  • 随着 spark 版本的迭代,spark 的 benchmark 的提升逐渐趋于平缓
  • 随着硬件技术的发展,网络磁盘都有较大的提升但是 cpu 并没有很大的突破,cpu 可能会成为瓶颈
  • 在 java 中 cpu 的计算集中度不够高(cpu 用来纯计算的时间占比较少),没有充分利用 cpu 的并行计算能力
  • java 直接操作更高级的可并行化处理数据的 cpu 指令集困难

cpu 数据并行处理 & 向量化

并行处理

  • 举例说明:
    1. 假如要对两个数组的相同下标的元素相加
    2. 不管寄存器有多长对于两个整型数据相加他的长度只会占用前面的部分 bit (32个bit) 浪费了后面的bit 长度所以可以进行优化
    3. 将多个同质的指令计算需求压到同一个寄存器里面用一组指令来完成

向量化计算的好处

  1. 感觉向量化的前提就是列计算。需要把同质化的计算放一块
  2. 减少虚函数的调用(java 面向接口编程,在计算时需要找到真正的实现方法),使得 cpu 的时间能够更加专注在 计算这一块
  • 比如 spark sql 都会通过 codegen 的方式去将 sql 逻辑代码化,而 codegen 出来的代码就是一个个不一样的 class. 没有继承多态等。这可能就是为啥要 codegen 而不再抽象公共类的原因吧
  1. 能够用上 cpu SIMD 指令集等技术对数据进行并行处理
  2. 因为输入是列数据所以会比较集中,cpu cache 的数据可能都是要计算的列的数据,相比行的数据,空间更节省,所以可以 cache 更多的列数据。cache 的数据可能就是下一步计算需要的,所以能够提供 cpu 的缓存命中率。

note: 由于对硬件了解不多,理解可能也不对

gluten 与 spark 结合的简单实现

  • 将 Spark 物理计划转换为 Substrate plain,然后 Substrait plain 通过JNI调用传递给 native。
  • Substrait 是一种可以跨平台将一种 plan 转化成另一种 plan 的通用转化格式。(gluten 作为 spark 和 native 向量化执行引擎的中间层,为了方便拓展不同的 native 向量化执行引擎所以采用了 Substrait)
  • 在 native 层,构建 native 算子操作链然后在 native 引擎里面去执行。
  • native 层与 spark 数据交互以 spark ColumnarBatch 的形式进行交互
  • 交互的数据格式可能会涉及到 apache arrow
  • arrow 是一种通用的数据共享的数据结构,它再内存中的格式就是序列化后的格式,传输过程不需要序列化和反序列化
  • 由于 native 引擎对数据格式,udf,算子等的不支持导致 stage 没办法在 native 中执行。而一个 spark sql 作业中会根据宽依赖划分有不同的 stage, 所以stage 间的 shuffle write/shuffle read 会遇到如下情况,针对这些场景是怎么实现的呢?


    47b2570df007adb30f97b3a3f95a6a6.png

spark 集成 gluten 的 shuffle 实现

看 gluten shuffleManager 之前先看基于 spark 框架 shuffleManager 的主要作用

spark shuffleManager

  • org.apache.spark.shuffle.ShuffleManager 是为 shuffle systems 提供的一种插件试的接口
  • 接口核心有 3 个方法
    registerShuffle、getWriter、getReader
  • 在构建 ShuffleDependency 时会调用 shuffle manager 的注册方法,注册返回一个 ShuffleHandle 之后 ShuffleDependency 会持有 ShuffleHandle 的属性
  • 在 shuffler write 阶段会调用 manager.getWriter 并且会把这个 ShuffleHandle 句柄加进去去创建 shuffler write
    shuffler write 会返回一个 MapStatus ,这个在 spark3 的 AQE 里面会用到去动态优化 stage 的执行计划
  • 在 shuffler read 阶段会调用 manager.getReader 并且会把这个 ShuffleHandle 句柄加进去去创建 shuffler read

spark shuffler writer

  • 在 spark diriver 会对 stage 转成成一组 task (ShuffleMapTask 或者 ResultTask) 给到 excutor 去执行,当 ShuffleMapTask 在 excutor 端调用起来后会执行 runTask 方法,runTask 方法在最后会 调用 shuffleDependency.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
  • 其实这都只是把链路打通了一切都是 lazy 都是流式的。 这个抽象的 rdd 一直 next . 这里可能会有疑问 为什么到了 excutor 端了还他妈的是 rdd, 不应该是一个具体的数据集了吗? 没错他还传了个 partition,所以只会去计算这个 rdd 的某个分区然后狗日的一直 next 进行 shuffle 输出。
  • 那这个迭代器是怎么来的呢,就是 rdd 的compute 方法构造出来的, 构造逻辑就是 shufflerReader.read() 经过某种 tramsform(比如执行 codegen 的代码)
  • 总之就是从 shuffle read 里面读数据执行计算逻辑就得到了这个计算链路,链路是 lazy 的 。直到 write 调用 next 方法触发链路数据流的计算

gluten shuffleManager 实现

shuffleManager 是 spark 框架的而 native 的执行还是遵循这套框架, shuffle write /shuffe read 还是要走 spark 的框架走的 JVM 所以先实现 ShuffleManager 接口再看数据在 JVM 和 native 层的流转

  1. 实现 org.apache.spark.shuffle.ShuffleManager 接口 的 registerShuffle、getWriter、getReader 方法

registerShuffle:

  • spark 构建ShuffleDependency 时 -> 调用 registerShuffle
  • registerShuffle 方法实现会判断 ShuffleDependency 是否为 gluten 的 ColumnarShuffleDependency 如果是构建出一个 ColumnarShuffleHandle 返回,如果不是就走原生 shuffleManager 的逻辑,去创建 spark 自身的 BypassMergeSortShuffleHandle 或者 SerializedShuffleHandle 或者 BaseShuffleHandle
  • ColumnarShuffleDependency 什么时候创建与spark ShuffleDependency 有什么不一样?
  • ColumnarShuffleDependency 的创建大概逻辑如下
1. gluten-core: org.apache.gluten.extension.columnar.TransformExchange 
1.1. 如果 gluten 开关开启会调用 BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(plan, child) 
     把 SparkPlan(ShuffleExchangeExec, BroadcastExchangeExec) 进行转化

2. backends-velox: org.apache.gluten.backendsapi.velox.SparkPlanExecApiImpl#genColumnarShuffleExchange 
2.1 创建 ColumnarShuffleExchangeExec
2.3 ColumnarShuffleExchangeExec extends ShuffleExchangeLike with GlutenPlan

3. 什么时候创建的 ColumnarShuffleExchangeExec
gluten-core: org.apache.spark.sql.execution.ColumnarShuffleExchangeExec#columnarShuffleDependency

gluten-core: org.apache.spark.sql.execution.ColumnarShuffleExchangeExec#prepareShuffleDependency

backends-velox: org.apache.gluten.backendsapi.velox.SparkPlanExecApiImpl#genShuffleDependency

gluten-data: org.apache.spark.sql.execution.utils.ExecUtil#genShuffleDependency 
   创建出了 ColumnarShuffleDependency
  • 总结就是 sparkPlan 会转化成 glutenPlan 时会创建出 ColumnarShuffleDependency

  • ColumnarShuffleDependency 与 spark ShuffleDependency 有什么不一样?

  • ColumnarShuffleDependency 针对 shuffle 数据会有特别的序列化器,只用在读取 shuffle 数据序列化器对 shuflle 数据进行反序列化。 不需要序列化因为数据是从 native 里面吐出来的,内部应该还是字节数组的形式

关于序列化器的创建和解释

在构建 shufflerDependcy 时会创建 反序列化器
1. gluten-core: org.apache.spark.sql.execution.ColumnarShuffleExchangeExec#serializer
1.1 val serializer: Serializer = BackendsApiManager.getSparkPlanExecApiInstance 
.createColumnarBatchSerializer(schema, metrics)

 1.2 默认场景
     org.apache.gluten.vectorized.ColumnarBatchSerializer
     rss 是 Celeborn 场景:
     org.apache.spark.shuffle.CelebornColumnarBatchSerializer
     
2. org.apache.gluten.vectorized.ColumnarBatchSerializer 实现
 2.1. 实际走的 ColumnarBatchSerializerInstance 
 2.2. 构造 ColumnarBatchSerializerInstance 时会初始化一个 Long 类型的 shuffleReaderHandle
 2.3. 这个东西 shuffleReaderHandle 有点抽象,他是 JVM 层和 native 数据转化的重要媒介,native 吐给 JVM 层一般都是一个 long 的数字(怎么理解呢? 内存地址 or 偏移量?),如果需要 JVM 层可以通过这个数字调用 native 方法返回一批数据。
2.4 反序列方法 deserializeStream(in: InputStream)  输入流是 java 的输入流
    也就是 shuffle read 流
2.5  反序列化具体实现:
  2.5.1 private val byteIn: JniByteInputStream = JniByteInputStreams.create(in)
  把 java 的数据流直接写入 native 的堆外内存
  2.5.2. 然后在 JVM 层提供一个对 native 内存迭代访问的包装器
    private val wrappedOut: GeneralOutIterator = new ColumnarBatchOutIterator(
      Runtimes.contextInstance(),
      ShuffleReaderJniWrapper
        .create()
        .readStream(shuffleReaderHandle, byteIn),
      nmm)
  2.5.3. 迭代器的 next 方法
      public ColumnarBatch nextInternal() throws IOException {
       // JVM 的next 实际是调用 native 的 next  native 的next 返回一个 long
        long batchHandle = nativeNext(iterHandle);
        if (batchHandle == -1L) {
          return null; // stream ended
        }
      // long 会映射到一块内存直接包装成 ColumnarBatches,
      // 这过程不涉及到数据的拷贝读取,数据还是在堆外 和 native 层还是一块内存
      // 所以针对 JVM 读取 native 数据是轻量的 
       return ColumnarBatches.create(runtime, batchHandle);
     }

2.6. 总之如果下游 stage 能够放入 native 中执行,那么他就会通过 shuffle reader 把 JVM 的输入流通过 JNI 的方式写入native 的堆外内存,然后在 JVM 层提供一个对 native 数据的引用的轻量的迭代器进行包装
2.7. 如果 上游 stage 数据是 在 JVM 中执行计算的,那么还会有一个 行转列的过程,把行的数据转成 ColumnBatch 再喂给 native

getShufflerWriter:

  1. 会根据 shuffle Hander 去判断是否为 glutlen 的 ColumnarShuffleHandle, 如果是则创建 gluten 的 writer, 否则走原生的逻辑 创建 BypassMergeSortShuffleWriter 或者 UnsafeShuffleWriter 或者 SortShuffleWriter
  2. 与原生 writer 的不一样的地方
    2.1. 最终用的是 org.apache.spark.shuffle.ColumnarShuffleWriter
    2.2. shuffler write 的东西是 rdd compute 出来的数据我们看看 在gluten 里面有哪些rdd
   org.apache.gluten.execution.BroadcastBuildSideRDD
   org.apache.gluten.execution.VeloxBroadcastBuildSideRDD
   org.apache.spark.sql.execution.VeloxColumnarWriteFilesRDD
   org.apache.spark.sql.execution.ShuffledColumnarBatchRDD

2.3. 这些 rdd 返回的类型都是 ColumnBatch, 也就是 c++ 吐出来的就是 ColumnBatch
2.4. ColumnBatch 里面装的是 ColumnVector 集合 的第 0 个ColumnVector 是 IndicatorVector 之后的 ColumnVector 都是 PlaceholderVector
IndicatorVector 会有这个 ColumnBatch 的 long 类型的 Handle,还有一个 runtime (JNI相关的) 可以根据 这个 handle 和 runtime 去获取 真实的数据
这种需要通过 JNI 的方式把 native 数据 load 到 JVM 里面的场景应该是上游是在 native 中进行计算,而下游必须在 JVM 里面进行所以需要一次列转行。
2.5. 最终数据在 native 层进行数据溢写落盘之类的,对于 rss 场景他需要
在创建 shufflerWriter 时实例化一个 pusher 传入 JNI 里面,然后 JNI 会把 调用 pusher 实例把数据 突出来,写入 rss. 此时数据是二进制的。

getShufflerReader:

  • 同 getShufflerWriter 一样也会去判断是否为 glutlen 的 ColumnarShuffleHandle,如果是则创建 spark 的 BlockStoreShuffleReader,如果不是 ColumnarShuffleHandle 他也是会创建 spark 的 BlockStoreShuffleReader
  • 不同点是 gluten 的 shuffler Reader 会传入一个 SerializerManager。 这与 spark 原生的不一样的地方
  1. 他不会对输入流加密和压缩。
  2. 会有特有的反序列实现 这个在registerShuffle 环节已经介绍了

针对最开始的 3 个场景的总结

纯属自己的理解不一定对

  • 场景1. shuffle write 是 native shuffle read 是 jvm
    中间会经过一次列转行。 native 该怎写还是怎么写
  • 场景2. shuffle write 是 jvm shuffle read 是 native
    中间会经过一次行转列, native 改怎么读就怎读
  • 场景3. shuffle write 是 native shuffle read 是 native
    按照 native 的方式读写
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容