欢迎关注公众号“Tim在路上”
通常为提高数据处理的效率,计算引擎要实现谓词的下推,而存储引擎可以根据下推的过滤条件尽可能的跳过无关数据或文件。不管是Hudi、Iceberg还是Delta都实现了基于min-max索引的Data-skiping技术。它指的是在元数据中都记录这数据文件中的每一列的最小值和最大值,通过查询中列上的谓词来决定当前的数据文件是否可能包含满足谓词的任何records,是否可以跳过读取当前数据文件。
但是当当数据均匀分布在所有文件中时,那么每个文件列的upper_bounds和lower_bounds的range会很大,那么这时数据跳过的能力就会失效。其次为了避免分区字段列与其他查询过滤列存在clustering或相关性,一般是建议在查询前进行sort排序。
但是传统的线性排序排序,其跳过效率仅在第一列中很高,但在随后的列中其效果迅速下降。因此,如果有两个或更多列同样可能出现在高度选择性的谓词中,则数据跳过将无法为这个整体带来更好的性能。
从上面图片中的例子可以看出, 对于按字典顺序排列的 3 元组整数,只有第一列能够通过排序将数据聚集起来变成连续可筛选的数据,但是,如果在第三列中找到值为“4”的数据,就会发现它现在分散在各处,根本没有本地化,那数据的跳过性能可想而知。
Z-order 又名Z阶曲线有个非常重要的应用特性,就是降维。它可以将多维空间问题降维到低维或者一维空间问题。将多列转换为一个Z-index列,按照其进行排序,根据Z-Order值相近的数据会分布到同一个文件中的特性,从各个维度的值分布来说,从数据整体来看也会呈现近似单调的分布。但是,文件的upper_bounds和lower_bounds的重合度会有效降低,dataskipping技术又可以重新生效。 Z-order的核心是提高做File Skip 而非 Row Skip, 这样更能减少不必要的IO。除了z-order之外常见的还有希尔伯特曲线(Hilbert curve)。
Z-order 简要说明
映射多维数据到一维,并按照这个维度进行排序
Z-Order的关键在于z-value的映射规则。基于位交叉的技术,每个维度值的比特位交叉出现在最终的z-value里。例如假设我们想计算二维 坐标(x=97, y=214)的z-value,我们可以按如下步骤进行
第一步:将每一维数据用bits表示
x value:01100001 97 98
y value:11010110 104 105
第二步:从y的最左侧bit开始,我们将x和y按位做交叉,即可得到z 值,如下所示
z-value: 1011011000101001 46633
对于多维数据,我们可以采用同样的方法对每个维度的bit位做按位交叉形成 z-value,一旦我们生成z-values 我们即可用该值做排序,基于z值的排序自然形成z阶曲线对多个参与生成z值的维度都有良好的聚合效果。
zorder的性能效果如何呢?下面我们举个例子:
[图片上传失败...(image-eda57c-1657366659242)]
在上面的图片中,每个数据框代表一个文件,每个文件均匀存放4个数据,左边是线性排序后的数据分布,右边是Zorder排序。从中可以看出在查询x = 2 or y = 2的条件时,线性排序需要扫描9个文件,zorder排序只需要扫描7个文件。
Delta的Z-order 的几个细节
可以说实现Z-order并不难,但实现高效的Z-order还是比较复杂的。要实现Z-order, 首先就要考虑如何将多列查询谓词值转换为z-value。
从上面的介绍可以看出要生成z-value,目前最直观的办法是将多维数据转换为二进制然后再进行按位交叉生成z-value。如果直接将不同类型的数据转换为二进制,那么会存在几个问题:
- 如何保证不同类型的维度值(String, Long, Double ...)转成bit位时长度一致?这里可能需要对位数不够的进行左填充补0,另外对于String这类比较长的可能需要进行截取。
- 不同数据类型的null值如何处理?z-value的交叉生成不允许存在null值,这里可以选取min-max值作为null的填充。
从上面可以看出如果直接将多列值转换为二进制,不仅需要为每列值分配新的字节缓冲区,还需要对不同的数据类型进行不同的额外操作,同时由于String截取的存在可能造成数据不精准的存在, 而String类型又是比较常用的类型。
为了解决上述问题,一般采用对查询列进行排序,将每行数据映射为顺序id, 类似于row_number()或dense_rank()或rank()的窗口函数。
然而这种情况下对查询列进行依次排序,可见性能上肯定影响很大。
那么Delta是如何实现的?又是如何解决上述问题的?
Delta采取了降低精度的办法,将连续的值视为一个单位,将任意的查询列转换为range_parition_id()。这里的分区数可以用OPTIMIZE_ZORDERBY_NUM_RANGE_IDS表示。
那么如何实现呢?实现其实非常简单,获取range_parition_id的实际上只需要重用了Spark中的RangePartition操作,并在其基础上实现了range_partition_id(col, N) -> int的表达式。通过这个表达式就实现了将查询类转换为二进制的过程,这个过程避免了额外操作以及多次排序。这样的实现利用RangePartition对键进行采样计算分区边界的实现。
将多个查询列转换为二级制后,然后通过调用interleace_bits(...)交叉的方法,就生成了Z-value。
那么如何进行排序写出实现呢?采用那种方式呢?
如何直接将数据按照Z-value进行全局排序,会存在两个问题:
- 对整个数据排序是非常低效的。
- Z-order曲线中存在“接缝”,其中需要线性遍历在继续其路径之前跳转到不同的区域, 这很不利于小范围内的查询。
那么Delta实现主要是将其按照z-value进行range分区,实际上就是调用了Spark的repartitionByRange的表达式。
如何处理数据倾斜呢?
如果要聚类的列整体上是倾斜的,那么即使转换为z-value也会是倾斜的,这时候如果对其进行排序写出可能会比较耗时。这里的解决办法其实很简单就是在z-value字节数组的结尾追加随机字节,然后再对其进行分区范围内排序。
Delta的Z-order源码分析
下面我们从用户调用的角度来分析下源码:
OPTIMIZE delta.table WHERE day = 25 AND city = ‘New York’ ZORDER BY (col1, col2)
- SQL解析为Optimizer命令,进行执行前校验
当用户执行一条上面的sql后,首先会经历sql解析阶段。Spark使用的是开源组件antlr4将输入SQL解析为AST树。它的解析语法在DeltaSQLBase.g4文件中。
| OPTIMIZE (path=STRING | table=qualifiedName)
(WHERE partitionPredicate = predicateToken)?
(zorderSpec)? #optimizeTable
zorderSpec
: ZORDER BY LEFT_PAREN interleave+=qualifiedName (COMMA interleave+=qualifiedName)* RIGHT_PAREN
| ZORDER BY interleave+=qualifiedName (COMMA interleave+=qualifiedName)*
;
从上面的源码可以看出,OPTIMIZE sql不仅支持表名还支持直接指定的优化文件目录。但这里要注意的是在优化数据布局的时候,where条件的过滤列必须分区分区列的子集。即查询列day和city必须是分区列。
解析g4文件一般是在DeltaSqlParser类中,通过visitZorderSpec方法就可以拿到用户输入的zorder列。而要优化的目录名和表名以及过滤的条件是通过visitOptimizeTable方法获取的。
override def visitOptimizeTable(ctx: OptimizeTableContext): AnyRef = withOrigin(ctx) {
if (ctx.path == null && ctx.table == null) {
throw new ParseException("OPTIMIZE command requires a file path or table name.", ctx)
}
// z-order的列的seq
val interleaveBy = Option(ctx.zorderSpec).map(visitZorderSpec).getOrElse(Seq.empty)
OptimizeTableCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.partitionPredicate).map(extractRawText(_)))(interleaveBy)
}
这里通过访问者的设计模式,获取OptimizeTableContext命令中的数据。从上面可以看出先从visitZorderSpec获取z-order的列的数组,然后将其封装到OptimizeTableCommand类中。OptimizeTableCommand属于command表达式,它在执行时会执行其run方法。
override def run(sparkSession: SparkSession): Seq[Row] = {
val deltaLog = getDeltaLog(sparkSession, path, tableId, "OPTIMIZE")
// [1] 从metadata中获取表的分区列
val partitionColumns = deltaLog.snapshot.metadata.partitionColumns
// [2] 解析查询谓词
val partitionPredicates = partitionPredicate.map(predicate => {
val predicates = parsePredicates(sparkSession, predicate)
verifyPartitionPredicates(
sparkSession,
partitionColumns,
predicates)
predicates
}).getOrElse(Seq.empty)
// [3] 校验zorder列
validateZorderByColumns(sparkSession, deltaLog, zOrderBy)
val zOrderByColumns = zOrderBy.map(_.name).toSeq
// [4] 调用optimize命令
new OptimizeExecutor(sparkSession, deltaLog, partitionPredicates, zOrderByColumns)
.optimize()
}
从这里可以看出在Optimize执行前会先进行校验,首先zorder应该是对应的分区目录内的数据进行zorder, 所以zorder列不应该包含分区列。其次zorder列,必须是在元数据中完成了min-max统计的列,即可以通过其进行数据跳过。最后在调用OptimizeExecutor的optimize方法。下面我们到optimize方法中展开看看:
- 筛选候选文件, 并对文件进行分区压缩
def optimize(): Seq[Row] = {
val txn = deltaLog.startTransaction()
// [1] 按照where条件进行筛选出候选文件
val candidateFiles = txn.filterFiles(partitionPredicate)
val partitionSchema = txn.metadata.partitionSchema
// [2] 注意这里如果isMultiDimClustering多维聚集则不过滤文件的大小直接选择所有的文件
// select all files in case of multi-dimensional clustering
val filesToProcess = candidateFiles.filter(_.size < minFileSize || isMultiDimClustering)
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq
val jobs = groupFilesIntoBins(partitionsToCompact, maxFileSize)
这里主要进行候选文件的筛选,同时在优化前进行文件分组。这里需要注意的是如果是多维聚集则不过滤文件的大小直接选择所有的文件。这里的文件分组算法采用的压缩采用的binpack算法,保证每个分组的文件size和均匀。
val parallelJobCollection = new ParVector(jobs.toVector)
// Create a task pool to parallelize the submission of optimization jobs to Spark.
val threadPool = ThreadUtils.newForkJoinPool(
"OptimizeJob",
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS))
val updates = try {
val forkJoinPoolTaskSupport = new ForkJoinTaskSupport(threadPool)
parallelJobCollection.tasksupport = forkJoinPoolTaskSupport
// 并行的执行文件的合并、压缩与Zorder优化
parallelJobCollection.flatMap(partitionBinGroup =>
runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize)).seq
} finally {
threadPool.shutdownNow()
}
然后并发的执行文件的合并、压缩与Zorder优化,合并与压缩当然和zorder没有什么关系,这是Optimizer原有功能的重用,可以优化zorder排序后的性能。下面我们进入runOptimizeBinJob方法中,主要看看Zorder优化的实现。
- 根据多维列值生成Z-value
// [1] 读取分组的文件
val input = txn.deltaLog.createDataFrame(txn.snapshot, bin, actionTypeOpt = Some("Optimize"))
val repartitionDF = if (isMultiDimClustering) {
val totalSize = bin.map(_.size).sum
// 分区数为分组文件的size大小除以其中最大的文件size
val approxNumFiles = Math.max(1, totalSize / maxFileSize).toInt
// [2] 调用 MultiDimClustering.cluster
MultiDimClustering.cluster(
input,
approxNumFiles,
zOrderByColumns)
} else {
input.coalesce(numPartitions = 1)
}
合并后的文件数为分组文件的size大小除以其中最大的文件size。这里就是读取分组文件,然后调用cluster方法。
override def cluster(
df: DataFrame,
colNames: Seq[String],
approxNumPartitions: Int): DataFrame = {
val conf = df.sparkSession.sessionState.conf
// 用于控制rangeId,越大,精度越好,但以牺牲性能为代价
val numRanges = conf.getConf(DeltaSQLConf.MDC_NUM_RANGE_IDS)
// 是否追加噪音,为了避免数据倾斜的追加的噪音后缀
val addNoise = conf.getConf(DeltaSQLConf.MDC_ADD_NOISE)
val cols = colNames.map(df(_))
// 执行cluster表达式,生成z-value
val mdcCol = getClusteringExpression(cols, numRanges)
val repartitionKeyColName = s"${UUID.randomUUID().toString}-rpKey1"
...
}
在cluster中,获取numRanges和addNoise配置,然后再调用getClusteringExpression来获取z-value列。
object ZOrderClustering extends SpaceFillingCurveClustering {
override protected[skipping] def getClusteringExpression(
cols: Seq[Column], numRanges: Int): Column = {
assert(cols.size >= 1, "Cannot do Z-Order clustering by zero columns!")
// [1] 调用range_partition_id表达式生成rangeIdCols
val rangeIdCols = cols.map(range_partition_id(_, numRanges))
// [2] 执行interleave_bits,并转换为String
interleave_bits(rangeIdCols: _*).cast(StringType)
}
}
[1] 调用range_partition_id表达式生成rangeIdCols
[2] 执行interleave_bits,并转换为String,这就是最终生成的z-value
range_partition_id函数就是range_partition_id(col, N) -> int的实现过程,通过上面的分区其实其是重用了Spark的RangePartition下面我们展开看看,这里是如何调用的。
def range_partition_id(col: Column, numPartitions: Int): Column = withExpr {
RangePartitionId(col.expr, numPartitions)
}
range_partition_id的实现非常简单,只是简单的将其封装为RangePartitionId类并返回,RangePartitionId类是一个空的表达式操作。那么它是如果调用RangePartition的呢?
其实这个涉及到了SparkSQL的执行优化过程,SQL在执行前,通常需要先进行RBO优化,CBO等优化过程,这些优化的实现通常以Rule的形式进行注册封装,优化后才转换为RDD再执行Spark任务。
extensions.injectOptimizerRule { session =>
new RangePartitionIdRewrite(session)
}
上面的代码就是在优化器中注入了一个RangePartitionIdRewrite规则,用于重写 range_partition_id的占位符来调用RangePartitioner。
case RangePartitionId(expr, n) =>
val aliasedExpr = Alias(expr, "__RPI_child_col__")()
val exprAttr = aliasedExpr.toAttribute
// [1] 对于查询列过滤null的行
val planForSampling = Filter(IsNotNull(exprAttr), Project(Seq(aliasedExpr), node.child))
...
withCallSite(session.sparkContext, desc) {
SQLExecution.withNewExecutionId(qeForSampling) {
withJobGroup(session.sparkContext, jobGroupId, desc) {
// [2] 创建一个pair(InternalRow, null), 用于存储查询列对应的rangeid
val rddForSampling = qeForSampling.toRdd.mapPartitionsInternal { iter =>
val mutablePair = new MutablePair[InternalRow, Null]()
iter.map(row => mutablePair.update(row.copy(), null))
}
// [3] 创建RangePartitioner,传入排序的sortOrder
val sortOrder = SortOrder(exprAttr, Ascending)
implicit val ordering = new LazilyGeneratedOrdering(Seq(sortOrder), Seq(exprAttr))
val partitioner = new RangePartitioner(n, rddForSampling, true, sampleSizeHint)
// [4] 调用PartitionerExpr,执行写入rangeid
PartitionerExpr(expr, partitioner)
override def eval(input: InternalRow): Any = {
val value: Any = child.eval(input)
row.update(0, value)
partitioner.getPartition(row)
}
从上面的代码可以看出这里主要做了几件事:
[1] 对于查询列过滤null的行
[2] 创建一个pair(InternalRow, null), 用于存储查询列对应的rangeid
[3] 创建RangePartitioner,传入排序的sortOrder
[4] 调用PartitionerExpr,执行写入rangeid
- 根据z-value进行range重分区
下面我们再回到cluster方法中,看看剩余的代码:
var repartitionedDf = if (addNoise) {
val randByteColName = s"${UUID.randomUUID().toString}-rpKey2"
val randByteCol = (rand() * 255 - 128).cast(ByteType)
df.withColumn(repartitionKeyColName, mdcCol).withColumn(randByteColName, randByteCol)
.repartitionByRange(approxNumPartitions, col(repartitionKeyColName), col(randByteColName))
.drop(randByteColName)
} else {
df.withColumn(repartitionKeyColName, mdcCol)
.repartitionByRange(approxNumPartitions, col(repartitionKeyColName))
}
repartitionedDf.drop(repartitionKeyColName)
这里的代码就非常直观了,其实际上就是调用repartitionByRange表达式,并最终将z-value传入,最终再将拼接的排序分区列删除。最后再调用txn.writeFiles(repartitionDF)进行执行。
- 更新统计信息
if (isMultiDimClustering) {
val inputFileStats =
ZOrderFileStats(removedFiles.size, removedFiles.map(_.size.getOrElse(0L)).sum)
optimizeStats.zOrderStats = Some(ZOrderStats(
strategyName = "all", // means process all files in a partition
inputCubeFiles = ZOrderFileStats(0, 0),
inputOtherFiles = inputFileStats,
inputNumCubes = 0,
mergedFiles = inputFileStats,
// There will one z-cube for each partition
numOutputCubes = optimizeStats.numPartitionsOptimized))
}
最后在更新进行合并、压缩和zorder排序后文件的统计信息。
下面我们来总结下整个过程,并对比下和Iceberg、Hudi的实现区别:
- 需要筛选出待优化的文件。OPTIMIZE语句的where条件只支持使用分区列,也就是支持对表的某些分区进行OPTIMIZE。
- 根据多维列值计算出Z地址。这里将不同类型查询列转换为粗放的rangeId, 然后将查询各列的rangId转换为二进制进行交叉组合生成z-value。但是这里的rangeId需要通过专家经验的配置,其次其解决数据倾斜时在z-value数组中随机追加噪音字节。
- 根据z-value进行range重分区。数据会shuffle到多个partition中。这一步等价于repartitionByRange(z-value)。
- 将重分区的partition使用Copy on Write写回到存储系统中,然后更新统计信息。
从这里可以看出Delta实现的z-order和Hudi、Iceberg的实现从本质上来说都是文件间的排序采用zorder, 文件内的排序任然采用线性排序的思想。这样可以避免在小范围查询中(查询正好落入单个文件内)使用线性排序会有更好的性能。但是不同的是生成z-value的方式上的不同,Delta生成z-value的方式是采用映射为rangeid的办法,并未采用直接转换为二进制的办法。这种方式避免了额外操作以及多次排序,但需要更多的专家经验。另外Delta的Zorder操作需要用户手动的执行优化。
下面我们留下几个问题,可以思考下:
- Z-order排序的列一般选择那些列进行排序优化,是否排序的列越多越好?
- Z-order排序后,是否对所有的查询sql有提速的效果,那些场景会不会变的更慢?