大表 join 小表 -> broadcast join
对小的 RDD 调用 collectAsMap() 回收至 driver, 再将其 broadcast, 用另一个 RDD 查表关联.hash join
val rdd1: RDD[(KeyType, ValueType)] =.... // 大 RDD, partitions = 1000
val rdd2: RDD[(keyType, ValueType)] = .... // 较小 RDD, partitions = 100
val rdd2Partioned = rdd2.partitionBy(new HashPartitioner(1000)) // 保持与大的 rdd1 partitions 一致
val rdd2HashMapedPartioned = rdd2Partioned
.mapPartitions { iter =>
val mapData = new HashMap[KeyType, ValueType]()
iter.map { case (key, value) =>
mapData.put(key, value)
}
Iterator(mapData)
}
rdd1
.zipPartitions(rdd2, preservesPartitioning = true)({ case (iter1, iter2) =>
iter2.map { rdd2Map =>
iter1.map { case (key, value) =>
val rdd2KeyValue = rdd2Map.get(key)
// 这里可以根据 rdd1 中的 Key, Value, 和 rdd2 中对应的 map 去做相应的逻辑处理
}
}
})
同一个 key 多个 join, 使用 union + group by key
image.png
进一步优化:
union 时, 避免使用 rdd1.union(rdd2).union(rdd3).union(rdd4) 的操作, union 只适合两个 rdd 的操作, 多个 rdd union 时采用 SparkContext.union(Array(RDD)), 以此避免 union 操作嵌套层数太多, 从而过程调用链路太长, 耗时甚至引发 StackOverFlow