Hudi 源码之 Clustering

什么是Clustering

开门见山,Clustering主要有两个作用:数据小文件合并和重排序。
当数据写入Hudi表时,为了提高写入效率和存储利用率,可能会产生大量小文件。Hudi的Clustering机制允许在后台周期性地将这些小文件合并成大文件,从而减少存储碎片和元数据管理开销,提高查询性能。
Clustering过程可以重新组织和排序数据,依据用户指定的列进行排序,这样能提升相关查询的性能,比如范围扫描或者JOIN操作,通过预排序的数据,查询引擎能够更高效地处理查询请求。
本篇首先介绍clustering的配置和操作,然后分析clustering的源代码,包含clustering执行计划的创建和根据计划执行clustering过程两个部分。

Clustering分区过滤策略

Clustering分区过滤策略按照hoodie.clustering.plan.partition.filter.mode配置项过滤出所需的partition。有如下选项:

  • NONE: 不过滤,返回所有partition path。
  • RECENT_DAYS: 按照partition path倒序排序。跳过hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions个partition,返回hoodie.clustering.plan.strategy.daybased.lookback.partitions个partition。如果partition path是日期,可以实现过滤出最近N天的数据。
  • SELECTED_PARTITIONS: 获取hoodie.clustering.plan.strategy.cluster.begin.partitionhoodie.clustering.plan.strategy.cluster.end.partition之间的分区。
  • DAY_ROLLING: 每次clustering一部分分区。如果分区的index对24取余等于排期时候当前时间的小时数,则该分区需要clustering。

配置项

Flink 配置项

  • clustering.schedule.enabled:是否排期clustering。默认值为false。
  • clustering.async.enabled:是否异步执行clustering。默认值为false。
  • clustering.delta_commits:每隔多少次commit之后触发clustering。默认为4。
  • clustering.tasks:clustering并行度。默认和写入并行度相同。
  • clustering.plan.strategy.daybased.lookback.partitions:对应RECENT_DAYS策略,保留多少个分区参与clustering。默认值为2。
  • clustering.plan.strategy.daybased.skipfromlatest.partitions:对应RECENT_DAYS策略,跳过最近多少个分区,之后的分区参与clustering。默认值为0。
  • clustering.plan.strategy.cluster.begin.partition:对应SELECTED_PARTITIONS策略,指定参与clustering的开始分区。无默认值。
  • clustering.plan.strategy.cluster.end.partition:对应SELECTED_PARTITIONS策略,指定参与clustering的结束分区。无默认值。
  • clustering.plan.strategy.partition.regex.pattern:被该正则匹配的分区会参与clustering。无默认值。
  • clustering.plan.strategy.partition.selected:指定要参与clustering的分区。无默认值。
  • clustering.plan.strategy.class:clustering策略类。默认值为FlinkSizeBasedClusteringPlanStrategy。选择最近N天的分区,选取较小的file slice参与clustering。
  • clustering.plan.partition.filter.mode:分区过滤策略。默认值为NONE。
  • clustering.plan.strategy.target.file.max.bytes:每个clustering group(可理解为并行度)clustering完毕之后生成的文件大小上限。默认为1GB。
  • clustering.plan.strategy.small.file.limit:小于该大小的文件会认为是clustering的参与对象。默认值为600MB。
  • clustering.plan.strategy.sort.columns:clustering排序字段。多个字段使用逗号分隔。无默认值。
  • clustering.plan.strategy.max.num.groups:clustering plan阶段创建出的clustering group数量,对应并行度。默认为30。

Spark 配置项

  • hoodie.clustering.plan.strategy.daybased.lookback.partitions:对应RECENT_DAYS策略,保留多少个分区参与clustering。默认值为2。
  • hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions:对应RECENT_DAYS策略,跳过最近多少个分区,之后的分区参与clustering。默认值为0。
  • hoodie.clustering.plan.strategy.cluster.begin.partition:对应SELECTED_PARTITIONS策略,指定参与clustering的开始分区。无默认值。
  • hoodie.clustering.plan.strategy.cluster.end.partition:对应SELECTED_PARTITIONS策略,指定参与clustering的结束分区。无默认值。
  • hoodie.clustering.plan.strategy.small.file.limit:小于该大小的文件会认为是clustering的参与对象。默认值为300MB。
  • hoodie.clustering.plan.partition.regex.pattern:被该正则匹配的分区会参与clustering。无默认值。
  • hoodie.clustering.plan.partition.selected:指定要参与clustering的分区。无默认值。
  • hoodie.clustering.plan.strategy.class:clustering plan策略。默认为org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy。查找小文件(hoodie.clustering.plan.strategy.small.file.limit),这些小文件参与clustering。
  • hoodie.clustering.execution.strategy.class:clustering执行策略。默认为org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy。按照指定的列排序,并满足配置的目标文件大小。
  • hoodie.clustering.inline:是否启用inline clustering。默认为false。
  • hoodie.clustering.inline.max.commits:最多多少次commit触发inline clustering,控制clustering的频率。默认为4。
  • hoodie.clustering.async.max.commits:控制async clustering的频率。默认为4。
  • hoodie.clustering.max.parallelism:clustering的最大并行度。默认为15。
  • hoodie.clustering.group.read.parallelism:Spark从clustering group读取数据的并行度。默认值为20。
  • hoodie.clustering.plan.partition.filter.mode:分区过滤策略。默认为NONE。
  • hoodie.clustering.plan.strategy.max.bytes.per.group:每个clustering group最多产生的数据量。默认为2GB。
  • hoodie.clustering.plan.strategy.max.num.groups:最大clustering group数量。每次clustering的最大操作数据量= hoodie.clustering.plan.strategy.max.bytes.per.group * hoodie.clustering.plan.strategy.max.num.groups。
  • hoodie.clustering.plan.strategy.target.file.max.bytes:每个clustering group生成hoodie.clustering.plan.strategy.max.bytes.per.group / hoodie.clustering.plan.strategy.target.file.max.bytes个file group。
  • hoodie.clustering.plan.strategy.single.group.clustering.enabled:是否能够生成只有一个file group参与的clustering执行计划。默认为true。
  • hoodie.clustering.plan.strategy.sort.columns:clustering排序字段。多个字段使用逗号分隔。无默认值。
  • hoodie.clustering.updates.strategy:update策略。默认为org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy。clustering的时候拒绝更新。配置org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy可允许更新。
  • hoodie.clustering.schedule.inline:是否启用inline clustering排期。默认为false。
  • hoodie.clustering.async.enabled:是否启用async clustering。默认为false。
  • hoodie.layout.optimize.strategy:布局策略。使用LINEAR(线性),ZORDER还是HILBERT(希尔伯特曲线)。默认值是LINEAR。
  • hoodie.layout.optimize.curve.build.method:可配置DIRECT或者SAMPLE。SpatialCurveCompositionStrategyType中SAMPLE的数据排序分布效果较DIRECT更好,但是执行速度更慢。默认配置的是DIRECT类型。
  • hoodie.layout.optimizebuild.curve.sample.size:对应SAMPLE类型,默认值为200000。
  • hoodie.layout.optimize.data.skipping.enable:是否在布局优化完成后收集统计信息来启用数据跳过功能。默认为true。
  • hoodie.clustering.rollback.pending.replacecommit.on.conflict:默认值为false。如果允许对等待clustering的file group进行更新,则应将此配置设置为回滚失败或pending的clustering instants。 仅当插入或更新与pending clustering的file group存在冲突时,pending clustering才会被回滚。 在设置此配置时请务必谨慎,特别是在非常频繁地执行clustering操作时。这在极少数情况下可能导致竞态条件, 例如,在获取到实例后但回滚完成前clustering操作已完成。

离线触发

使用Spark

任务提交命令如下:

spark-submit \  
--class org.apache.hudi.utilities.HoodieClusteringJob \  
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.15.0.jar \  
--props /path/to/config/clusteringjob.properties \  
--mode scheduleAndExecute \  
--base-path /path/to/hudi_table/basePath \  
--table-name hudi_table_schedule_clustering \  
--spark-memory 1g

由于clustering的配置项较多,可以把这些配置项写在/path/to/config/clusteringjob.properties文件中。例如:

hoodie.clustering.async.enabled=true  
hoodie.clustering.async.max.commits=4  
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824  
hoodie.clustering.plan.strategy.small.file.limit=629145600  
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy  
hoodie.clustering.plan.strategy.sort.columns=column1,column2

HoodieClusteringJob的参数如下:

参数名 是否必须 默认值 备注
--base-path - Hudi表根目录
--table-name - 表名
--instant-time - 只在execute模式有效。指定执行哪个instant time的clustering。如果没有指定。执行最早排期的clustering。使用scheduleAndExecute默认该配置项会被忽略。
--parallelism 1 clustering并行度
--spark-master - Spark master
--spark-memory - Spark内存
--retry 0 重试次数
--skip-clean true clustering完毕之后是否跳过clean
--retry-last-failed-clustering-job false 使用scheduleAndExecute有效。是否重试最近失败的clustering job
--mode - schedule表示排期。execute表示执行。scheduleAndExecute表示排期并执行
--help false 打印帮助信息
--job-max-processing-time-ms 0 只有--retry-last-failed-clustering-job和scheduleAndExecute是否有效。如果超过配置时间clustering job仍未完成。Hudi认为该job失败并重新启动
--props - clustering配置参数所在文件。使用properties文件格式
--hoodie-conf - 额外的Hudi配置

使用Flink

Flink的HoodieFlinkClusteringJob不仅有clustering,还包含了archive和clean操作。

任务提交命令如下:

./bin/flink run \
    -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \
    lib/hudi-flink1.17-bundle-0.15.0.jar \
    --path hdfs://xxx:8020/table

参数解析:

参数名 是否必须 默认值 备注
--path - Hudi表的根目录
--clustering-delta-commits 1 最多多少次commit触发clustering,控制clustering的频率
--clustering-tasks false -1 Clustering task 的并发数
--clean-policy false KEEP_LATEST_COMMITS clean策略。可以使用KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS
--clean-retain-commits 10 保留最近n个commit不被清理
--clean-retain-hours 24 保留最近n小时的commit不被清理
--clean-retain-file-versions 5 保留最近n个文件版本不被清理
--archive-min-commits 20 归档commit前保留的最少commit数量
--archive-max-commits 30 归档commit前保留的最多commit数量
--schedule false 是否排期clustering plan
--instant-time - clustering instant time
--clean-async-enabled false 是否启用异步clean
--plan-strategy-class FlinkSizeBasedClusteringPlanStrategy clustering策略类
--plan-partition-filter-mode NONE 分区过滤模式
--seq FIFO Clustering plan的执行顺序。
LIFO: 从最近的plan开始执行,
FIFO: 从最早的plan开始执行
--target-file-max-bytes 1GB 最大目标文件
--small-file-limit 600 小于该大小的文件会参与clustering
--skip-from-latest-partitions 0 clustering跳过最近n个分区
--sort-columns - clustering排序字段。多个字段使用逗号分隔
--sort-memory 128 排序内存大小
--max-num-groups 30 Clustering group个数
--target-partitions 2 参与clustering的分区数
--cluster-begin-partition - Clustering开始分区
--cluster-end-partition - Clustering结束分区
--partition-regex-pattern - 匹配该正则的partition参与clustering
--partition-selected - 指定参与clustering的分区
--service false 是否开启 service 模式,service模式为常驻作业
--min-clustering-interval-seconds 600s 异步clustering服务的最小时间间隔
--hoodie-conf - 额外的Hudi配置
--props - clustering等参数配置所在文件路径

创建clustering执行计划

创建执行计划位于ClusteringPlanActionExecutor类的execute方法,代码如下所示:

  @Override  
  public Option<HoodieClusteringPlan> execute() {  
    // 创建执行计划
    Option<HoodieClusteringPlan> planOption = createClusteringPlan();  
    // 如果计划创建成功(可能存在没有file slice需要cluster的情况)
    if (planOption.isPresent()) {  
      // 创建clustering instant
      // clustering instant的类型是replace commit,意味这clustering之后的数据文件替换掉先前的
      HoodieInstant clusteringInstant =  
          new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime);  
      try {  
        HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()  
            .setOperationType(WriteOperationType.CLUSTER.name())  
            .setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))  
            .setClusteringPlan(planOption.get())  
            .build();  
        // 添加到pending commit中
        table.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant,  
            TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));  
      } catch (IOException ioe) {  
        throw new HoodieIOException("Exception scheduling clustering", ioe);  
      }  
    }  
  
    return planOption;  
  }  
}

该方法创建clustering执行计划,然后再创建一个pending replace commit。因为clutering完成之后,新生成的数据文件会替换掉原有的数据文件,因此对应的commit类型为replace。
继续分析createClusteringPlan方法。其中首先判断是否满足可执行clustering的条件,然后获取配置的clustering策略类,创建clustering计划。
Clustering并不是说每次schedule都必须要执行。为了效率clustering要求至少要经过N次commit之后,才会schedule。此限制通过配置项hoodie.clustering.inline.max.commitshoodie.clustering.async.max.commits(分别对应inline和异步)来控制。如果满足clustering条件,通过hoodie.clustering.plan.strategy.class配置的策略类生成执行计划。
代码如下所示:

protected Option<HoodieClusteringPlan> createClusteringPlan() {  
  LOG.info("Checking if clustering needs to be run on " + config.getBasePath()); 
  // 获取上一次clustering对应的instant 
  Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline().getLastClusterCommit();  
  // 获取上次clustering之后提交的次数
  int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()  
      .findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE)  
      .countInstants();  
  // 读取hoodie.clustering.inline.max.commits配置,默认为4
  // 该配置项表示在上次clustering之后至少需要经历几次commit才能schedule下一次clustering
  // 这里处理inline clustering的配置
  if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {  
    LOG.warn("Not scheduling inline clustering as only " + commitsSinceLastClustering  
        + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "  
        + config.getInlineClusterMaxCommits());  
    return Option.empty();  
  }  
  // 同上,但这里处理异步clustering的配置
  // 配置项为hoodie.clustering.async.max.commits,默认值4
  if ((config.isAsyncClusteringEnabled() || config.scheduleInlineClustering()) && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {  
    LOG.warn("Not scheduling async clustering as only " + commitsSinceLastClustering  
        + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "  
        + config.getAsyncClusterMaxCommits());  
    return Option.empty();  
  }  
  
  LOG.info("Generating clustering plan for table " + config.getBasePath());  
  // 加载clustering策略类,对应配置项hoodie.clustering.plan.strategy.class
  // 默认为SparkSizeBasedClusteringPlanStrategy
  ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) ReflectionUtils.loadClass(  
      ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config),  
          new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config);  
  // 生成clustering计划
  return strategy.generateClusteringPlan();  
}

接着我们聚焦默认的策略SparkSizeBasedClusteringPlanStrategy。该策略根据文件大小来决定文件数据是否参与clustering。分析clustering计划生成步骤。
generateClusteringPlan方法位于SparkSizeBasedClusteringPlanStrategy的父类PartitionAwareClusteringPlanStrategy中。该方法根据hoodie.clustering.plan.strategy.partition.selectedhoodie.clustering.plan.strategy.partition.regex.patternhoodie.clustering.plan.partition.filter.mode条件过滤出符合要求的partition path。获取它们包含的file slice。从这些file slice中筛选出小文件(小于hoodie.clustering.plan.strategy.small.file.limit的文件)。将这些按照clutering要求的group大小(hoodie.clustering.plan.strategy.max.bytes.per.group),分成若干个group。Group数量上限为hoodie.clustering.plan.strategy.max.num.groups。此步骤对应小文件合并功能。
代码如下所示:

@Override  
public Option<HoodieClusteringPlan> generateClusteringPlan() {  
  if (!checkPrecondition()) {  
    return Option.empty();  
  }  
  // 获取metaclient,用来操作metadata
  HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();  
  LOG.info("Scheduling clustering for " + metaClient.getBasePath());  
  // 获取写配置
  HoodieWriteConfig config = getWriteConfig();  
  // 读取配置项hoodie.clustering.plan.strategy.partition.selected
  // 确定在哪些分区运行clustering
  String partitionSelected = config.getClusteringPartitionSelected();  
  LOG.info("Scheduling clustering partitionSelected: " + partitionSelected);  
  List<String> partitionPaths;  

  // 如果没有配置
  if (StringUtils.isNullOrEmpty(partitionSelected)) {  
    // get matched partitions if set  
    // 读取hoodie.clustering.plan.strategy.partition.regex.pattern配置
    // 获取正则表达式匹配的partition path
    partitionPaths = getRegexPatternMatchedPartitions(config, FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath()));  
    // filter the partition paths if needed to reduce list status  
  } else {  
    // 如果配置了partitionSelected,优先这个配置
    partitionPaths = Arrays.asList(partitionSelected.split(","));  
  }  
  // 过滤需要clustering的分区
  // 过滤策略对应配置项hoodie.clustering.plan.partition.filter.mode
  // 可用策略为NONE,RECENT_DAYS,SELECTED_PARTITIONS和DAY_ROLLING
  partitionPaths = filterPartitionPaths(partitionPaths);  
  LOG.info("Scheduling clustering partitionPaths: " + partitionPaths);  

  // 如果所有的分区都被排除了,返回空
  if (partitionPaths.isEmpty()) {  
    // In case no partitions could be picked, return no clustering plan  
    return Option.empty();  
  }  

  // 排除掉分区中已经要做clustering的file group(pending状态)
  // 筛选出小文件
  // 决定小文件判断阈值的配置项为hoodie.clustering.plan.strategy.small.file.limit
  // 将其映射为HoodieClusteringGroup
  // 映射逻辑后面分析
  List<HoodieClusteringGroup> clusteringGroups = getEngineContext()  
      .flatMap(  
          partitionPaths,  
          partitionPath -> {  
            List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());  
            return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());  
          },  
          partitionPaths.size())  
      .stream()  
      .limit(getWriteConfig().getClusteringMaxNumGroups())  
      .collect(Collectors.toList());  
  
  if (clusteringGroups.isEmpty()) {  
    LOG.warn("No data available to cluster");  
    return Option.empty();  
  }  
  // 构造cluster策略
  HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()  
      .setStrategyClassName(getWriteConfig().getClusteringExecutionStrategyClass())  
      .setStrategyParams(getStrategyParams())  
      .build();  

  // 构造clustering计划
  return Option.of(HoodieClusteringPlan.newBuilder()  
      .setStrategy(strategy)  
      .setInputGroups(clusteringGroups)  
      .setExtraMetadata(getExtraMetadata())  
      .setVersion(getPlanVersion())  
      .setPreserveHoodieMetadata(true)  
      .build());  
}

上面的filterPartitionPaths通过配置的hoodie.clustering.plan.partition.filter.mode过滤出所需的partition。具有有如下选项:

  • NONE: 不过滤,返回所有partition path。
  • RECENT_DAYS: 按照partition path倒序排序。跳过hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions个partition,返回hoodie.clustering.plan.strategy.daybased.lookback.partitions个partition。如果partition path是日期,可以实现过滤出最近N天的数据。
  • SELECTED_PARTITIONS: 获取hoodie.clustering.plan.strategy.cluster.begin.partitionhoodie.clustering.plan.strategy.cluster.end.partition之间的分区。
  • DAY_ROLLING: 每次clustering一部分分区。如果分区的index对24取余等于排期时候当前时间的小时数,则该分区需要clustering。

buildClusteringGroupsForPartition方法将筛选出的file slice按照从小到大排序。然后按照clustering配置的group size和group数量条件,合并为clustering group。

protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {  
  // 获取写入配置
  HoodieWriteConfig writeConfig = getWriteConfig();  
  
  List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();  
  List<FileSlice> currentGroup = new ArrayList<>();  
  
  // Sort fileSlices before dividing, which makes dividing more compact  
  // file slice按照base file大小排序,如果文件不存在,按照最大大小排序
  List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);  
  sortedFileSlices.sort((o1, o2) -> (int)  
      ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())  
          - (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));  
  
  long totalSizeSoFar = 0;  
  
  for (FileSlice currentSlice : sortedFileSlices) {  
    // 遍历所有file slice
    // 获取当前file slice的大小,如果文件不存在,获取大小上限
    long currentSize = currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();  
    // check if max size is reached and create new group, if needed.  
    // 如果本次累积的文件大小大于hoodie.clustering.plan.strategy.max.bytes.per.group
    // 并且当前group不为空
    if (totalSizeSoFar + currentSize > writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {  
      // totalSizeSoFar除以hoodie.clustering.plan.strategy.target.file.max.bytes向上取整
      // 计算出输出组编号
      int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());  
      LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "  
          + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);  
      // 加入到fileSliceGroups集合中,保存结果
      // 保存了输出组组和输出组编号
      fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));  
      // 结果保存之后,清零currentGroup和totalSizeSoFar
      currentGroup = new ArrayList<>();  
      totalSizeSoFar = 0;  
  
      // if fileSliceGroups's size reach the max group, stop loop  
      // 检查file group个数是否超过了hoodie.clustering.plan.strategy.max.num.groups
      // 超过的话退出循环,本次不再处理后面的file slice
      if (fileSliceGroups.size() >= writeConfig.getClusteringMaxNumGroups()) {  
        LOG.info("Having generated the maximum number of groups : " + writeConfig.getClusteringMaxNumGroups());  
        break;  
      }  
    }  
  
    // Add to the current file-group  
    // 加入到当前文件组
    currentGroup.add(currentSlice);  
    // assume each file group size is ~= parquet.max.file.size  
    // 累积大小到totalSizeSoFar变量
    totalSizeSoFar += currentSize;  
  }  
  
  if (!currentGroup.isEmpty()) {  
    // 处理最后一个output group
    // shouldClusteringSingleGroup在下面两个配置项任意一个启用的时候为true
    // 表示只有一个输出文件组的话,也clustering
    // hoodie.clustering.plan.strategy.sort.columns
    // hoodie.clustering.plan.strategy.single.group.clustering.enabled
    if (currentGroup.size() > 1 || writeConfig.shouldClusteringSingleGroup()) {  
      int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());  
      LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "  
          + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);  
      fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));  
    }  
  }
  // 构造并返回fileSliceGroups
  return fileSliceGroups.stream().map(fileSliceGroup ->  
    HoodieClusteringGroup.newBuilder()  
        .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))  
        .setNumOutputFileGroups(fileSliceGroup.getRight())  
        .setMetrics(buildMetrics(fileSliceGroup.getLeft()))  
        .build());
}

到此为止clustering计划生成部分分析完毕。

根据执行计划执行clustering

Clustering的执行开始于BaseHoodieWriteClient::cluster
在clustering之前,首先执行preWrite操作。

public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete) {  
  // 创建hudi table,根据引擎(Spark/Flink)和表类型(MOR/COW)的不同,有多种实现类
  HoodieTable table = createTable(config, context.getHadoopConf().get());
  // 执行写入前操作,包含:
  // inflight和requested instant去掉本次instant
  // 启动clean和archive服务(如果开启的话)
  preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
  // 执行clutering
  return tableServiceClient.cluster(clusteringInstant, shouldComplete);
}

接着是BaseHoodieTableServiceClient::cluster方法。该方法检测当前clustering是否已经pending,配置监控,执行clustering并返回clustering执行结果元数据。

public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete) {  
  // 同上个方法,获取table
  HoodieTable<?, I, ?, T> table = createTable(config, context.getHadoopConf().get());  
  HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();  
  HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);  
  // 检查本次cluster是否已经pending状态。如果是,需要回滚
  if (pendingClusteringTimeline.containsInstant(inflightInstant)) {  
    table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));  
    table.getMetaClient().reloadActiveTimeline();  
  }  
  // cluster时长计时器监控
  clusteringTimer = metrics.getClusteringCtx();  
  LOG.info("Starting clustering at " + clusteringInstant);  
  // 调用table的cluster服务
  HoodieWriteMetadata<T> writeMetadata = table.cluster(context, clusteringInstant);
  // 转换metadata到对应计算引擎格式
  HoodieWriteMetadata<O> clusteringMetadata = convertToOutputMetadata(writeMetadata);  
  // Validation has to be done after cloning. if not, it could result in referencing the write status twice which means clustering could get executed twice.  
  // 检查cluster写入状态不能为空
  validateClusteringCommit(clusteringMetadata, clusteringInstant, table);  
  
  // Publish file creation metrics for clustering.  
  // 读取并返回监控信息
  if (config.isMetricsOn()) {  
    clusteringMetadata.getWriteStats()  
        .ifPresent(hoodieWriteStats -> hoodieWriteStats.stream()  
            .filter(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats() != null)  
            .map(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats().getTotalCreateTime())  
            .forEach(metrics::updateClusteringFileCreationMetrics));  
  }  
  
  // TODO : Where is shouldComplete used ?  
  if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {  
    completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant, Option.ofNullable(convertToWriteStatus(writeMetadata)));  
  }  
  return clusteringMetadata;  
}

以Spark为例,我们查看COW表的HoodieSparkCopyOnWriteTable::cluster逻辑。

public HoodieWriteMetadata<HoodieData<WriteStatus>> cluster(HoodieEngineContext context,  
                                                         String clusteringInstantTime) {  
  return new SparkExecuteClusteringCommitActionExecutor<>(context, config, this, clusteringInstantTime).execute();  
}

此逻辑交由SparkExecuteClusteringCommitActionExecutor执行。继续分析SparkExecuteClusteringCommitActionExecutor::execute方法,它调用了BaseCommitActionExecutor::executeClustering方法。

@Override  
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {  
  return executeClustering(clusteringPlan);  
}

BaseCommitActionExecutor::executeClustering该方法反射加载hoodie.clustering.execution.strategy.class配置项对应的clustering策略(默认为SparkSortAndSizeExecutionStrategy),然后执行clustering。

protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieClusteringPlan clusteringPlan) {  
  // 创建instant
  HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);  
  // Mark instant as clustering inflight  
  // 标记instant为inflight状态
  table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());  
  table.getMetaClient().reloadActiveTimeline();  
  
  // Disable auto commit. Strategy is only expected to write data in new files.  
  // 禁用自动commit
  config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, Boolean.FALSE.toString());  

  // 添加_hoodie_commit_time等5个元数据字段到schema中
  final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));  
  // 加载hoodie.clustering.execution.strategy.class配置项对应的clustering策略类
  // 执行它的performClustering方法
  // 对于默认的配置,clustering策略类为SparkSortAndSizeExecutionStrategy
  HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = (  
      (ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>)  
          ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),  
              new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))  
      .performClustering(clusteringPlan, schema, instantTime);  
  // 获取写入状态
  HoodieData<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses();  
  // 更新表索引,更新数据所在位置
  HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);  
  // 持久化保存
  statuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE), context, HoodieData.HoodieDataCacheKey.of(config.getBasePath(), instantTime));  
  // triggers clustering.  
  // 更新writeMetadata中的writestats
  writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());  
// 获取clustering操作的数据文件file id和partition path
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata)); 
// 提交修改的writeMetadata,clustering对后续操作生效
  commitOnAutoCommit(writeMetadata);  
  if (!writeMetadata.getCommitMetadata().isPresent()) {  
    HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),  
        extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());  
    writeMetadata.setCommitMetadata(Option.of(commitMetadata));  
  }  
  return writeMetadata;  
}

Clustering的执行细节位于策略类中。我们这里分析默认的策略类SparkSortAndSizeExecutionStrategy::performClustering方法。该方法位于父类MultipleSparkJobExecutionStrategy::performClustering中。该方法使用线程池,一个线程处理一个input group(对应执行计划中提到的clustering group),但线程数不能超过配置的最大值。

@Override  
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {  
  JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());  
  // 是否保留元数据,默认为true
  boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(true);  
  // 使用专门的线程执行clustering。创建clustering线程池
  // 取InputGroups数量(plan中clustering生成file group的数量)
  // 最大值为hoodie.clustering.max.parallelism,最大值默认15
  ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(  
      Math.min(clusteringPlan.getInputGroups().size(), writeConfig.getClusteringMaxParallelism()),  
      new CustomizedThreadFactory("clustering-job-group", true));  
  try {  
    // execute clustering for each group async and collect WriteStatus  
    // 在线程池中执行clustering,获取执行结果
    Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(  
            clusteringPlan.getInputGroups().stream()  
                .map(inputGroup -> {  
 // hoodie.datasource.write.row.writer.enable如果为true,使用Spark原生的Row类型,避免类型转换引发的额外代价
                  if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", true)) {  
                    return runClusteringForGroupAsyncAsRow(inputGroup,  
                        clusteringPlan.getStrategy().getStrategyParams(),  
                        shouldPreserveMetadata,  
                        instantTime,  
                        clusteringExecutorService);  
                  }  
                  return runClusteringForGroupAsync(inputGroup,  
                      clusteringPlan.getStrategy().getStrategyParams(),  
                      shouldPreserveMetadata,  
                      instantTime,  
                      clusteringExecutorService);  
                })  
                .collect(Collectors.toList()))  
        .join()  
        .stream();  
    JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));  
    JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);  
  
    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();  
    writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));  
    return writeMetadata;  
  } finally {  
    clusteringExecutorService.shutdown();  
  }  
}

我们继续分析默认配置的执行路线ClusteringPlanActionExecutor::runClusteringForGroupAsyncAsRow。该方法获取到所有需要clustering的数据到Spark的dataset,读取表schema和各个file id从属的partition path的对应关系。然后执行clustering。

private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup,  
                                                                                   Map<String, String> strategyParams,  
                                                                                   boolean shouldPreserveHoodieMetadata,  
                                                                                   String instantTime,  
                                                                                   ExecutorService clusteringExecutorService) {  
  return CompletableFuture.supplyAsync(() -> {  
    // 获取spark context
    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());  
    // 转换所有clustering涉及到的数据为Spark DataSet
    Dataset<Row> inputRecords = readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime);  
    // 获取带有元数据字段的schema
    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));  
    // 转换clustering的file slice为HoodieFileGroupId
    // 保存的是partition path和file id的对应关系
    List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()  
        .map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))  
        .collect(Collectors.toList());  
    // 执行clustering
    return performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata,  
        clusteringGroup.getExtraMetadata());  
  }, clusteringExecutorService);  
}

SparkSortAndSizeExecutionStrategy::performClusteringWithRecordsAsRow方法获取分区器,将数据重新排序,最后使用批量插入的方式,写回parquet文件。

@Override  
public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> inputRecords,  
                                                                 int numOutputGroups,  
                                                                 String instantTime, Map<String, String> strategyParams,  
                                                                 Schema schema,  
                                                                 List<HoodieFileGroupId> fileGroupIdList,  
                                                                 boolean shouldPreserveHoodieMetadata,  
                                                                 Map<String, String> extraMetadata) {  
  LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);  
  // 生成写入配置,clustering输出多少个file group就配置多少个bulk insert并行度
  HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()  
      .withBulkInsertParallelism(numOutputGroups)  
      .withProps(getWriteConfig().getProps()).build();  
  // 配置最大parquet文件大小为clustering目标文件最大上限
  // 对应配置项为hoodie.clustering.plan.strategy.target.file.max.bytes
  newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));  

  // 获取分区器
  BulkInsertPartitioner<Dataset<Row>> partitioner = getRowPartitioner(strategyParams, schema);  
  // 使用分区器分区数据(数据重新排序)
  Dataset<Row> repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups);  
  // 将重排序之后的数据批量插入
  return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig,  
      partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata);  
}

接下来分析的重点是clustering的另一个功能:将数据重排序。因此重点是分区器和分区器重排序的逻辑。获取分区器的逻辑位于它的父类MultipleSparkJobExecutionStrategy::getRowPartitioner中。代码如下:

private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategyParams,  
                                                    Schema schema,  
                                                    boolean isRowPartitioner) { 
  // 获取排序字段配置项
  // 对应的配置项为hoodie.clustering.plan.strategy.sort.columns 
  // 使用逗号分隔
  Option<String[]> orderByColumnsOpt =  
      Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))  
          .map(listStr -> listStr.split(","));  
  
  return orderByColumnsOpt.map(orderByColumns -> {  
    // 获取hoodie.layout.optimize.strategy配置,字段可使用zorder或者hilbert曲线排序或者linear线性排序
    HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy();  
    switch (layoutOptStrategy) {  
      case ZORDER:  
      case HILBERT:  
        return isRowPartitioner  
            ? new RowSpatialCurveSortPartitioner(getWriteConfig())  
            : new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext) getEngineContext(), orderByColumns, layoutOptStrategy,  
            getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema), recordType);  
      case LINEAR:  
        return isRowPartitioner  
            ? new RowCustomColumnsSortPartitioner(orderByColumns, getWriteConfig())  
            : new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());  
      default:  
        throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));  
    }  
  }).orElseGet(() -> isRowPartitioner  
      ? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig(), getHoodieTable().isPartitioned(), true)  
      : BulkInsertInternalPartitionerFactory.get(getHoodieTable(), getWriteConfig(), true));  
}

对于使用Spark原生Row类型的情况,isRowPartitionertrue。如果使用ZORDER或者HILBERT排序策略,使用RowSpatialCurveSortPartitioner,LINEAR排序策略对应的是RowCustomColumnsSortPartitioner
接下来我们分别分析这两个partitioner是如何对数据重排序的。
首先是RowSpatialCurveSortPartitioner::repartitionRecords,代码如下:

@Override  
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputPartitions) {  
  return reorder(records, outputPartitions);  
}

repartitionRecords调用了reorder方法。

protected Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) { 
  // 检查排序字段配置
  if (orderByColumns.length == 0) {  
    // No-op  
    return dataset;  
  }  
  
  List<String> orderedCols = Arrays.asList(orderByColumns);  
  // curveCompositionStrategyType默认为DIRECT
  switch (curveCompositionStrategyType) {  
    case DIRECT:  
      return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);  
    case SAMPLE:  
      return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);  
    default:  
      throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType));  
  }  
}

SpatialCurveCompositionStrategyType中SAMPLE的数据排序分布效果较DIRECT更好,但是执行速度更慢。默认配置的是DIRECT类型。
接下来分析DIRECT类型处理方式,对应的是SpaceCurveSortingHelper::orderDataFrameByMappingValues。该方法首先判断排序字段配置的合法性。然后将数据按照排序字段,使用Z曲线或者是Hilbert曲线重排序。

public static Dataset<Row> orderDataFrameByMappingValues(  
    Dataset<Row> df,  
    HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,  
    List<String> orderByCols,  
    int targetPartitionCount  
) {  
  // 获取字段名称和StructField的对应关系
  Map<String, StructField> columnsMap =  
      Arrays.stream(df.schema().fields())  
          .collect(Collectors.toMap(StructField::name, Function.identity()));  
  // 检查排序字段是否出现在schema中
  List<String> checkCols =  
      orderByCols.stream()  
          .filter(columnsMap::containsKey)  
          .collect(Collectors.toList());  
  // 如果没有,说明排序字段配置有误,跳过不再继续执行
  if (orderByCols.size() != checkCols.size()) {  
    LOG.error(String.format("Trying to ordering over a column(s) not present in the schema (%s); skipping", CollectionUtils.diff(orderByCols, checkCols)));  
    return df;  
  }  
  
  // In case when there's just one column to be ordered by, we can skip space-curve  
  // ordering altogether (since it will match linear ordering anyway) 
  // 如果排序字段只有一个,没必要使用空间曲线方式排序,直接使用Spark排序
  if (orderByCols.size() == 1) {  
    String orderByColName = orderByCols.get(0);  
    LOG.debug(String.format("Single column to order by (%s), skipping space-curve ordering", orderByColName));  
  
    // TODO validate if we need Spark to re-partition  
    return df.repartitionByRange(targetPartitionCount, new Column(orderByColName));  
  }  
  // 字段个数
  int fieldNum = df.schema().fields().length;  

  // 返回排序字段对应的index和字段信息对应关系
  Map<Integer, StructField> fieldMap =  
      orderByCols.stream()  
          .collect(  
              Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), columnsMap::get));  
  
  JavaRDD<Row> sortedRDD;  
  // 根据布局优化策略,排序RDD
  switch (layoutOptStrategy) {  
    case ZORDER:  
      sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);  
      break;  
    case HILBERT:  
      sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);  
      break;  
    default:  
      throw new UnsupportedOperationException(String.format("Not supported layout-optimization strategy (%s)", layoutOptStrategy));  
  }  
  
  // Compose new {@code StructType} for ordered RDDs  
  // 为排序后的RDD创建StructType(schema)
  StructType newStructType = composeOrderedRDDStructType(df.schema());  

  // 返回dataset
  return df.sparkSession()  
      .createDataFrame(sortedRDD, newStructType)  
      .drop("Index");  
}

我们先看第一种情况,Z曲线排序。方法位于SpaceCurveSortingHelper::createZCurveSortedRDD
该方法将多个排序字段的值映射为8字节内容(多的截取少的补充),然后每个字段的字节内容各取一位拼接到一起,然后再各取第二位拼接……一直循环,这个步骤称之为二进制数据交织(interleaving)。将交织之后的值作为一个字段,拼接在数据中。然后按照该字段的内容排序。

private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {  
  return originRDD.map(row -> {  
    // 将数据中每个排序字段的值填充为8字节内容
    // 多的截取少的补充
    byte[][] zBytes = fieldMap.entrySet().stream()  
      .map(entry -> {  
        int index = entry.getKey();  
        StructField field = entry.getValue();  
        return mapColumnValueTo8Bytes(row, index, field.dataType());  
      })  
      .toArray(byte[][]::new);  

    // Interleave received bytes to produce Z-curve ordinal  
    // 将这些排序字段的值交织起来
    // 比如有A,B两个排序字段。A字段值取1位,然后取B字段值1位,然后A再取下一位,B取下一位,以此类推
    byte[] zOrdinalBytes = BinaryUtil.interleaving(zBytes, 8);  
    // 追加zOrdinalBytes到Row
    return appendToRow(row, zOrdinalBytes);  
  })  
    // 按照该字段的值(zOrdinalBytes,位于row的末尾,index正好是fieldNum)排序
    .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);  
}

第二种情况为Hilbert曲线,对应方法为SpaceCurveSortingHelper::createHilbertSortedRDD。和ZOrder曲线排序处理逻辑基本相同,只是将Z曲线替换成了Hilbert曲线。

private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {  
  // NOTE: Here {@code mapPartitions} is used to make sure Hilbert curve instance is initialized  
  //       only once per partition  
  return originRDD.mapPartitions(rows -> {  
    // 创建hilbert fieldMap个数维度曲线
    HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());  
    return new Iterator<Row>() {  
  
      @Override  
      public boolean hasNext() {  
        return rows.hasNext();  
      }  
  
      @Override  
      public Row next() {  
        Row row = rows.next();  
        // 将row中的排序字段值映射为long类型
        long[] longs = fieldMap.entrySet().stream()  
            .mapToLong(entry -> {  
              int index = entry.getKey();  
              StructField field = entry.getValue();  
              return mapColumnValueToLong(row, index, field.dataType());  
            })  
            .toArray();  
  
        // Map N-dimensional coordinates into position on the Hilbert curve  
        // 使用hilbert曲线索引上面的long值,结果作为后面的排序依据
        byte[] hilbertCurvePosBytes = HilbertCurveUtils.indexBytes(hilbertCurve, longs, 63);  
        return appendToRow(row, hilbertCurvePosBytes);  
      }  
    };  
  })  
      .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);  
}

和空间曲线的方式相比,LINEAR线性排序显得较为简单。代码位于RowCustomColumnsSortPartitioner::repartitionRecords
。通过spark的sort算子按照配置的column排序。

@Override  
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {  
  return records  
      .sort(Arrays.stream(sortColumnNames).map(Column::new).toArray(Column[]::new))  
      .coalesce(outputSparkPartitions);  
}

到这里为止分区器的逻辑分析完毕。

参考文献

Clustering | Apache Hudi

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容