Kylin 4.0 TopN实现原理介绍

引言

Apache Kylin 是一个开源的分布式分析引擎,提供 Hadoop 之上的 SQL 查询接口及多维分析(OLAP)能力以支持超大规模数据。它能在亚秒内查询巨大的数据集 。

从Kylin 1.5开始就已经加入了TopN的度量,一直到kylin 3.x,实现上没有太大改变,想了解kylin3之前的TopN实现原理可以参考下面的文章:

https://www.infoq.cn/article/2016/08/Apache-Kylin-Top-N/?utm_source=tuicool

在2020年9月份Apache Kylin社区发布了Kylin 4.0.0-alpha版本,本文将详细介绍 Apache Kylin 4.0.0-alpha 中TopN 的实现。

背景

我们先从一个典型的TopN应用场景入手,在电商平台做数据分析的时候我们想要获取可能会经常需要查看销售额靠前100的卖家是哪些,SQL查询示例如下:

SELECT kylin_sales.part_dt, seller_id

FROM kylin_sales

GROUP BY

kylin_sales.part_dt, kylin_sales.seller_id

ORDER BY SUM(kylin_sales.price) desc LIMIT 100;

在大数据量的场景下,想要求TopN的数据,如果先group by后再计算出所有的sum(price),然后再对sum(price)进行排序,这里总的计算开销非常大的。

image

TopN介绍

通过对Kylin 3.x的TopN实现原理的介绍,我们知道Kylin 3及之前版本的TopN使用了Space-Saving的算法,并在此之上做了优化,代码实现可以查看org.apache.kylin.measure.topn.TopNCounter 。
Kylin 4.0继续使用了Space-Saving的算法,并在Kylin 3.x的TopNCounter的基础上做了优化,不过同样的当前TopN也是存在误差的,这些在后面会有详细介绍。

TopN实现

当前Kylin4的TopN UDAF注册是在org.apache.kylin.engine.spark.job.CuboidAggregator#aggInternal, 代码如下:

def aggInternal(ss: SparkSession,
                  dataSet: DataFrame,
                  dimensions: util.Set[Integer],
                  measures: util.Map[Integer, FunctionDesc],
                  isSparkSql: Boolean): DataFrame = {
      //省略
      measure.expression.toUpperCase(Locale.ROOT) match {
        //省略
        case "TOP_N" =>
          // Uses new TopN aggregate function
          // located in kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
          val schema = StructType(measure.pra.map { col =>
            val dateType = col.dataType
            if (col == measure) {
              StructField(s"MEASURE_${col.columnName}", dateType)
            } else {
              StructField(s"DIMENSION_${col.columnName}", dateType)
            }
          })

          if (reuseLayout) {
            new Column(ReuseTopN(measure.returnType.precision, schema, columns.head.expr)
              .toAggregateExpression()).as(id.toString)
          } else {
            new Column(EncodeTopN(measure.returnType.precision, schema, columns.head.expr, columns.drop(1).map(_.expr))
              .toAggregateExpression()).as(id.toString)
          }
       //省略
        case _ =>
          max(columns.head).as(id.toString)
      }
    }.toSeq
//省略
    if (reuseLayout) {
      val columns = NSparkCubingUtil.getColumns(dimensions) ++ measureColumns(dataSet.schema, measures)
      df.select(columns: _*)
    } else {
      df
    }
  }

其实TopN最初的实现的在org.apache.kylin.engine.spark.job.TopNUDAF,但是可以看到目前TopN的实现是在org.apache.spark.sql.udaf.BaseTopN.scala,最新的实现主要针对旧的实现修复了性能问题,详情可以查看KYLIN-4760

Kylin 4.0的TopN是通过Spark UDAF的方式实现的,以下是实现类接口之间的关系,可以看到最终实现的是BaseTopN,继承的是TypedImperativeAggregate。然后BaseTopN又有两个子类,分别是EncodeTopN和ReuseTopN,当从平表(FlatTable)开始构建的时候,FlatTable中没有构建过TopN,这里会调用EncodeTopN,再次之后从已经构建好的cuboid构建下一层cuboid的时候会调用ReuseTopN,避免重复计算,接口关系图如下:

image

继承TypedImperativeAggregate实现TopN,而不是UserDefineAggregateFunction主要是因为UserDefinedAggregateFunction 是把 catalyst 内部 internalRow 类型转换为了 Row 类型,然后使用用户自己的 update 方法处理,然后TypedImperativeAggregate需要自己做序列化、反序列化处理,少了一层转换。

TopNCounter介绍

前面提到Space-Saving算法是在TopNCounter中实现的,此处我们对TopNCounter的实现进行一个简要的介绍。BaseTopN对象初始化的时候会创建TopNCounter对象,用户保存计算过程中符合TopN条件的行,对应于Spark UDAF的概念是aggregate buffer。update,merge,eval都是处理的TopNCounter。TopNCounter在初始化的时候需要指定容量, 大小建议为N * TopNCounter.EXTRA_SPACE_RATE, 其中N为TopN定义的大小,EXTRA_SPACE_RATE为建议额外空间调整参数,默认为10, 也就是说如果定义的topn(10,4), 那么TopNCounter的初始化大小则为10 * 10 = 100 。

TopN的处理流程可以见下图:

image

update()主要将传入的行通过TopNCounter.offer() 将一行的内容插入到TopNCounter对象中,merge则是对两个经过update()操作的group进行去重合并,最后在eval()的时候调用TopNCounter.sortAndRetain()来排序和调整TopNCounter大小,最终得到聚合结果。

存储

Kylin 4.0目前使用的是parquet进行存储,我们定义topn(10,4), TopNCounter.EXTRA_SPACE_RATE 设置为1。cuboid中维度和度量列明的映射关系为:

0 -> seller_id

1 -> item_id

2 -> id

3 -> price

4 -> Count

5 -> TopN

如下是只有TopN和只有SUM的cuboid内容:

image

值得注意的是第二行,Count为11,但是实际上TopN列只存储了10个值,这是因为TopNCounter的容量只有10 * EXTRA_SPACE_RATE = 10, 超过10的内容不会被存储,这也是当前TopN存在误差的原因所在。可以看到TopN将计算的维度和group by的维度放到了一起,然后用数组的形式进行存储。

image

对于sum度量,kylin则是直接存储的sum后的聚合值。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351