问题
在spark-sql中使用此种 partition BETWEEN 'start' AND 'end' OR (partition = 'other' AND column <> 'value')条件查询数据时,程序拉取了全量分区中的数据
解决方案
在前面我们说到在使用spark-sql读取hive分区表的时候,使用了PredicateHelper 中的方法,但在其中追加了新的splitPredicates方法,原因是PredicateHelper 只有splitConjunctivePredicates和splitDisjunctivePredicates方法
protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
case And(cond1, cond2) =>
splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
}
}
protected def splitDisjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
case Or(cond1, cond2) =>
splitDisjunctivePredicates(cond1) ++ splitDisjunctivePredicates(cond2)
case other => other :: Nil
}
}
在PhysicalOperation此类中对Filter仅做了如下处理
可以看到,在解析Filter语法树的时候只调用了splitConjunctivePredicates方法,即只处理了AND表达式;
PruneFileSourcePartitions类中匹配了PhysicalOperation,其中生成的filters即为上述collectProjectsAndFilters中对Filter处理的结果;
private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case op @ PhysicalOperation(projects, filters,
logicalRelation @
LogicalRelation(fsRelation @
HadoopFsRelation(catalogFileIndex: CatalogFileIndex, partitionSchema, _, _, _, _), _, _))
if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>
下述为PruneFileSourcePartitions中的原代码,将这部分代码换成前面在指定分区数量过滤方法中获取分区表达式的代码,便能轻松解决掉上述问题
val sparkSession = fsRelation.sparkSession
val partitionColumns =
logicalRelation.resolve(
partitionSchema, sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
修改后的代码
val partitionColumns =
logicalRelation.resolve(
partitionSchema, sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters = splitPredicates(normalizedFilters.reduceLeft(And),partitionSet)