Spark RDD join 优化

  1. 大表 join 小表 -> broadcast join
    对小的 RDD 调用 collectAsMap() 回收至 driver, 再将其 broadcast, 用另一个 RDD 查表关联.

  2. hash join

val rdd1: RDD[(KeyType, ValueType)] =.... // 大 RDD, partitions = 1000
val rdd2: RDD[(keyType, ValueType)] = .... // 较小 RDD, partitions = 100

val rdd2Partioned = rdd2.partitionBy(new HashPartitioner(1000))  // 保持与大的 rdd1 partitions 一致

val rdd2HashMapedPartioned = rdd2Partioned
.mapPartitions { iter =>
  val mapData = new HashMap[KeyType, ValueType]()
  iter.map { case (key, value) =>
    mapData.put(key, value)
  }
  Iterator(mapData)
}

rdd1
.zipPartitions(rdd2, preservesPartitioning = true)({ case (iter1, iter2) =>
  iter2.map { rdd2Map =>
    iter1.map { case (key, value) =>
      val rdd2KeyValue = rdd2Map.get(key)
      // 这里可以根据 rdd1 中的 Key, Value, 和 rdd2 中对应的 map 去做相应的逻辑处理
    }
  }
})

同一个 key 多个 join, 使用 union + group by key

image.png

进一步优化:
union 时, 避免使用 rdd1.union(rdd2).union(rdd3).union(rdd4) 的操作, union 只适合两个 rdd 的操作, 多个 rdd union 时采用 SparkContext.union(Array(RDD)), 以此避免 union 操作嵌套层数太多, 从而过程调用链路太长, 耗时甚至引发 StackOverFlow

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容