笔者最近在做一些任务的优化,大多的场景是因为采用row_number()
进行分组去重,所以耗时特别高。样例代码如下:
select *
from (
select *
from (
select *,
row_number() over(partition by id order by ts desc) rn
from table
) as a
) as a
where rn = 1
原因
要做这个操作,不得不做的就是shuffle,而且因为要保留每行数据,没法在map端做合并,所以造成shuffle的量特别大。
优化思路
减少shuffle
map端合并
假如没有这样的窗口操作的算子,我们自己来实现一个这样的算子的话,其实是可以在map端做数据合并的。
例如,我们可以把rn=1
的操作前置,map端根据分区key和ts排序之后只保留1条就可以,然后reduce端把多个分区的1条记录合并再得到最终的一条。
目前Spark并没有这样的算子,但是可以用其他算子模仿一下,比如max算子,示例代码如下:
select id, split(tuple, ',') arr
from (
select id, max(concat_ws(',', ts, col1, col2, ..)) tuple
from table
group by id
) as a
在笔者的场景中,确实是可以减少shuffle的数据量,但是下一个stage会很耗时。所以这个是一个策略,但是需要按照自己的情况测试下,也要结合数据量。
如果重复的数据不多,这种策略并不是很好。
复用分区
这个是笔者偶然间发现的一个场景。
笔者的业务SQL如下:
select a.*, b.col
from (
select *, row_number() over (partition by id, fid order by client_time desc) rn
from table
) as a
left join mapping_tb as b
on a.id = b.id
where rn = 1
业务逻辑:
分区去重+关联维度表
执行计划如下:
+- SortMergeJoin LeftOuter (18)
:- Sort (10)
: +- Exchange (9)
: +- Filter (7)
: +- Window (6)
: +- Sort (5)
: +- ShuffleQueryStage (4)
: +- Exchange (3)
: +- * ColumnarToRow (2)
: +- Scan orc table (1)
+- Sort (17)
+- ShuffleQueryStage (16)
+- Exchange (15)
+- * Project (14)
+- * Filter (13)
+- * ColumnarToRow (12)
+- Scan orc mapping_tb (11)
可以看到关联维度表用的key为id,分区去重用的key是id+fid,前者的分区肯定包含后者,是不是可以在这上面做些文章。
带着这样的怀疑,对逻辑进行了调整,如下:
select *
from (
select *, row_number() over (partition by id, fid order by client_time desc) rn
from (
select a.*, b.col
from table as a
left join mapping_tb as b
on a.id = b.id
) as a
) as a
where rn = 1
执行计划如下:
+- Filter (17)
+- Window (16)
+- Sort (15)
+- Project (14)
+- SortMergeJoin LeftOuter (13)
:- Sort (5)
: +- ShuffleQueryStage (4)
: +- Exchange (3)
: +- * ColumnarToRow (2)
: +- Scan orc table (1)
+- Sort (12)
+- ShuffleQueryStage (11)
+- Exchange (10)
+- * Project (9)
+- * Filter (8)
+- * ColumnarToRow (7)
+- Scan orc mapping_tb (6)
从这里可以看出SortMergeJoin
之后做窗口操作,并没有做Shuffle,而是直接接了一个Sort
操作,这就是复用分区。
出于对比我们将上述的SQL做调整并比较执行计划。
select *
from (
select *, row_number() over (partition by id2, fid order by client_time desc) rn
from (
select a.*, b.col
from table as a
left join mapping_tb as b
on a.id = b.id
) as a
) as a
where rn = 1
执行计划如下:
+- Filter (18)
+- Window (17)
+- Sort (16)
+- Exchange (15)
+- Project (14)
+- SortMergeJoin LeftOuter (13)
:- Sort (5)
: +- ShuffleQueryStage (4)
: +- Exchange (3)
: +- * ColumnarToRow (2)
: +- Scan orc table (1)
+- Sort (12)
+- ShuffleQueryStage (11)
+- Exchange (10)
+- * Project (9)
+- * Filter (8)
+- * ColumnarToRow (7)
+- Scan orc mapping_tb (6)
可以看到,修改之后的SQL执行计划多了一个Exchange (15)
,说明这里需要再做一次Shuffle。
实际在执行过程中,Shuffle采用了id
做分区,Shuffle的数据量能减少大概1倍,而且后续没有再次的shuffle,即我们将shuffle的数据量减少了1倍。