THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

引言

Apache Kylin 是一个开源的分布式分析引擎,提供 Hadoop 之上的 SQL 查询接口及多维分析(OLAP)能力以支持超大规模数据。它能在亚秒内查询巨大的数据集 。

...

在2020年9月份Apache Kylin社区发布了Kylin 4.0.0-alpha版本,本文将详细介绍 Apache Kylin 4.0.0-alpha 中TopN 的实现。

背景

我们先从一个典型的TopN应用场景入手,在电商平台做数据分析的时候我们想要获取可能会经常需要查看销售额靠前100的卖家是哪些,SQL查询示例如下:

...

在大数据量的场景下,想要求TopN的数据,如果先group by后再计算出所有的sum(price),然后再对sum(price)进行排序,这里总的计算开销非常大的。

TopN介绍

通过对Kylin 3.x的TopN实现原理的介绍,我们知道Kylin 3及之前版本的TopN使用了Space-Saving的算法,并在此之上做了优化,代码实现可以查看org.apache.kylin.measure.topn.TopNCounter 。

Kylin 4.0继续使用了Space-Saving的算法,并在Kylin 3.x的TopNCounter的基础上做了优化,不过同样的当前TopN也是存在误差的,这些在后面会有详细介绍。

接口

Kylin 4.0的TopN是通过Spark UDAF的方式实现的,以下是实现类接口之间的关系,可以看到最终实现的是BaseTopN,继承的是TypedImperativeAggregate。然后BaseTopN又有两个子类,分别是EncodeTopN和ReuseTopN,当从平表(FlatTable)开始构建的时候,FlatTable中没有构建过TopN,这里会调用EncodeTopN,再次之后从已经构建好的cuboid构建下一层cuboid的时候会调用ReuseTopN,避免重复计算,接口关系图如下:

...

继承TypedImperativeAggregate实现TopN,而不是UserDefineAggregateFunction主要是因为UserDefinedAggregateFunction 是把 catalyst 内部 internalRow 类型转换为了 Row 类型,然后使用用户自己的 update 方法处理,然后TypedImperativeAggregate需要自己做序列化、反序列化处理,少了一层转换。

TopNCounter介绍

前面提到Space-Saving算法是在TopNCounter中实现的,此处我们对TopNCounter的实现进行一个简要的介绍。BaseTopN对象初始化的时候会创建TopNCounter对象,用户保存计算过程中符合TopN条件的行,对应于Spark UDAF的概念是aggregate buffer。update,merge,eval都是处理的TopNCounter。TopNCounter在初始化的时候需要指定容量, 大小建议为N * TopNCounter.EXTRA_SPACE_RATE, 其中N为TopN定义的大小,EXTRA_SPACE_RATE为建议额外空间调整参数,默认为10, 也就是说如果定义的topn(10,4), 那么TopNCounter的初始化大小则为10 * 10 = 100 。

...

update()主要将传入的行通过TopNCounter.offer() 将一行的内容插入到TopNCounter对象中,merge则是对两个经过update()操作的group进行去重合并,最后在eval()的时候调用TopNCounter.sortAndRetain()来排序和调整TopNCounter大小,最终得到聚合结果。

存储

Kylin 4.0目前使用的是parquet进行存储,我们定义topn(10,4), TopNCounter.EXTRA_SPACE_RATE 设置为1。cuboid中维度和度量列明的映射关系为:

...