Data Clustering是指数据按照读取时的IO粒度紧密聚集,而Data Skipping则根据过滤条件在读取时跳过不相干的数据
Data Skipping一般需要SQL引擎和存储的紧密配合,在SQL引擎中,通过类似“FilterPushDown”或者“Predicate PushDown”的执行计划优化规则把过滤条件下推到存储访问层。在存储访问层,通过文件(如HUDI,Iceberg等)或者RowGroup(如Parquet,ORC等)等级别的Min/Max/BloomFilter等信息结合过滤条件判断是否可以跳过相关文件或文件块。常用的Hive/Spark/Presto等各个SQL引擎以及HUDI/Iceberg/Parquet/ORC等存储格式均支持类似的过滤条件下推及索引技术,不过各引擎可下推的过滤条件以及各存储格式支持的索引类型不尽相同,具体的详情超过本文的讨论范围,有兴趣的可以深入研究
Apache Iceberg是近两年兴起的数据湖存储引擎三剑客(HUDI,Delta Lake,Iceberg)之一,Iceberg提供了表级别的抽象接口,自己在文件中维护表的元数据信息(而非通过Hive Metastore维护),基于此,Iceberg对于表的元数据管理以及表数据本身如何组织存储进行了封装,为众多SQL on Hadoop引擎向真正的分布式数仓演进提供了基础支持:
在Spark写数据任务中,一般最后一个Stage的每个Partition对应一个写出文件,所以我们通过控制最后一个Stage前的Shuffle Partitioner策略,就可以控制最终写出文件的个数以及数据如何在各个文件中分布
比如在Spark SQL中,ORDER BY
可以保证全局有序,而SORT BY
只保证Partition内部有序,即在写入数据时,加上ORDER BY
可以保证文件之间及文件内部数据均是有序的,而SORT BY
只能保证数据文件内部数据有序,数据文件中间数据是会重复存在的
本文只关注文件级别的Data Skipping,所以我们使用了Spark DataSet提供的repartitionByRange
接口,用于实现写出数据的分区之间的数据有序性,并不保证分区数据内部的有序性,对应代码如下:
spark.read
.table("hive_catalog.ssb.lo_iceberg")
.repartitionByRange(1000, $"s_city", $"c_city", $"p_brand")
.writeTo("hive_catalog.ssb.lo_iceberg_order")
.using("iceberg")
.create
Linear Order对于靠前的排序字段,Data Skipping的效果非常好,例如对于s_city
,只需扫描一个文件就拿到了查询结果,但是靠后的排序字段效果就会大打折扣。在实际的测试场景中,由于第一个排序字段s_city
的基数超过了文件数量,所以从第二个排序字段开始已经完全无法Skip任何文件,只能全表扫描全部的1000个文件。
repartitionByRange
提供了一个基于RangePartitioner的Shuffle分区策略,首先从Source表采样数据,对采样数据排序后,按照指定分区个数,选取出对应个数的Partition Boundaries,数据在Shuffle的时候,根据Partition Boundaries判断该数据属于哪个分区,从而保证不同分区数据之间的有序性。
| 过滤字段 | 扫描文件数 | Data Skipping比例 |
| s_city | 1 | 99.9% |
| c_city | 1000 | 0% |
| p_brand | 1000 | 0% |
在多维分析的实际场景中,一般都会有多个常用的过滤字段,Linear Order只对靠前字段有较好的Data Skip效果,通常会采用将低基数字段作为靠前的排序字段,从而才能保证对于后面的排序字段在过滤时也有一定的Data Skipping效果,但这无法从根本上解决问题,需要引入一种新的排序机制,使得多个常用的过滤字段均能够获得比较好的Data Skipping效果。
Interleaved Order
Interleaved Order(即Z-Order)是在图像处理以及数仓中使用的一种排序方式,Z-ORDER曲线可以以一条无限长的一维曲线,穿过任意维度的所有空间,对于一条数据的多个排序字段,可以看作是数据的多个维度,多维数据本身是没有天然的顺序的,但是Z-Order通过一定规则将多维数据映射到一维数据上,构建z-value,从而可以基于一维数据进行排序,此外Z-Order的映射规则保证了按照一维数据排序后的数据同时根据多个排序字段聚集。
参考wikipedia中的Z-Order介绍,可以通过对两个数据比特位的交错填充来构建z-value,如下图所示,对于(x, y)两维数据,数据值 0 ≤ x ≤ 7, 0 ≤ y ≤ 7,构建的z-values以及z-order顺序如下:
可以看到,如果根据z-values的顺序对数据进行排序,并平均分为4个文件,无论我们在查询中使用x或y字段过滤进行点查询,都可以skip一半的不相干文件,如果数据量更大,效果会更好,也就是说,基于Z-Order分区存储的文件,可以在多个字段上都有比较好的Data Skipping效果
对于非正数的数据如何排序:首符号翻转
字符串类型如何排序:一般采用asciII 进行排序,有6位,但是如果有固定前缀,就没法取到合适的编码,所以排序效果就不好
Boundary-based Interleaved Index
为了解决Interleaving Index在实际数据场景中的问题,一个最简单的思路就是针对参与z-value计算的过滤字段取Distinct值进行排序,排序的序号值自然就是从0开始的连续正整数,且和数据本身的顺序保持一致,但是这种做法的计算代价太大了,对于所有参与Z-ORDER字段需要全局排序,构建字典,在Shuffle时基于字典获取映射值参与z-value计算,会严重影响数据写入的速度,在实际场景中并不可行。
我们在测试中实现了一种基于Boundary构建Interleaved Index的方法,在开始阶段,对数据进行采样,从采样的数据中,对每个参与Z-ORDER的字段筛选规定个数的Boundaries并进行排序,每个字段映射为该数据在Boundaries中的Index,然后参与z-value的计算。由于Boundaries的index一定是从0开始的连续正整型数据,完全满足interleaving index的计算需求。
通过Boundary-based Interleaved Index,我们基于Spark实现了一个Z-Order Ordering实现,并重用RangePartitioner对数据进行分区,写入的逻辑如下:
spark.read
.table("hive_catalog.ssb.lo_iceberg")
.repartitionByZOrderRange(1000, $"s_city", $"c_city", $"p_brand")
.writeTo("hive_catalog.ssb.lo_iceberg_zorder")
.using("iceberg")
.create
使用同样的SQL查询后,通过Metric信息拿到扫描文件数量如下:
| 过滤字段 | 扫描文件数 | Data Skipping比例 |
| s_city | 186 | 81.4% |
| c_city | 164 | 83.6% |
| p_brand | 135 | 86.5% |
在目前的公开资料中,Databricks Runtime提供了ZOrder BY的支持[2],但是未提供任何实现细节。AWS DynamoDB实现了Z-Order Index,并在公开文章[3, 4]中介绍了实现思路,使用了上面“Interleaved Index”一节介绍的方式。在开源的大数据生态组件中,目前Hive/Spark/Presto都还没有官方的Z-Order支持,Impala在4.0版本中提供了对ZORDER BY的支持,也使用了类似上面“Interleaved Index”一节介绍的方式进行数据转换,但不是计算z-value,而是实现了一个特殊的Comparator用于顺序比较。
Hilbert Curve Order
Interleaved Order可以按照多个字段分布聚集,但是Z-ORDER曲线也有一个比较小的缺点,就是Z字形之间的连接可能跨度会比较长,在Spark的实现中我们基于Ranger Partitioner切分不同分区数据,切分的Boundary没法准确切中完整的Z字形区域数据,所以IceBerg文件中的Min/Max可能会出现较大的重合,降低Data Skipping的效率。Hibert Curve是另外一种可以用一条无限长的线,穿过任意维度空间里面的所有点的曲线类型[5],并且相对于Z-ORDER曲线,Hibert曲线在其将多维空间转换为一维空间的方法更好地保留了空间邻近性。一到六阶Hibert曲线的示例如下:
可以看到,相比于Z-ORDER曲线,Hibert曲线节点间的临近性更好,没有Z-ORDER曲线中大幅跨空间连接线的存在,这就使得无论我们如何对Hibert曲线进行切分,每个分区对应文件的Min/Max值重合范围都会比较少。
对于Hibert曲线,我们在测试中同样采用了类似Boundary-based Interleaved Index的方式计算hibert-value,首先对数据进行采样,针对每个参与计算的字段选取合适数量的boundaries并排序,使用字段值在boundaries中的index值参与hibert-value的计算。
通过Boundary-based Hibert Index,我们基于Spark实现了一个Hibert Curve Ordering实现,并重用RangePartitioner对数据进行分区,写入的逻辑如下:
spark.read
.table("hive_catalog.ssb.lo_iceberg")
.repartitionByHibertRange(1000, $"s_city", $"c_city", $"p_brand")
.writeTo("hive_catalog.ssb.lo_iceberg_hibert")
.using("iceberg")
.create
使用同样的SQL查询后,通过Metric信息拿到扫描文件数量如下:
| 过滤字段 | 扫描文件数 | Data Skipping比例 |
| s_city | 145 | 85.5% |
| c_city | 131 | 86.9% |
| p_brand | 117 | 88.3% |
相比于Z-Order,经过Hibert Curve Clustering的数据,在三个过滤字段上的Data Skipping比例均有进一步的提升。