背景

在 IoTDB 中,聚合查询可以使用 GROUP BY 子句指定按照时间区间分段聚合。用户可以指定聚合的时间间隔和滑动步长,相关参数如下:

  • 参数 1:整体的时间窗口。
  • 参数 2(interval):划分时间轴的时间间隔参数(> 0)。
  • 参数 3(slidingStep):滑动步长(可选,默认值与时间间隔相同)。

img

在之前的定义中,滑动步长必须大于等于时间间隔,现在拟支持滑动步长小于时间间隔的情况。

设计思路

  • 根据时间间隔和滑动步长将整体的时间窗口划分成若干不重叠的预聚合窗口
  • 先对每个预聚合窗口内的数据进行聚合(调用之前的接口即可),将这些预聚合值维护在一个队列中并实时更新聚合结果。
  • 当聚合窗口内的所有预聚合窗口的聚合值都计算完毕时,此时得到的聚合结果即该聚合窗口的聚合结果。
  • 聚合窗口移动时,同时更新队列,将不在聚合窗口内的预聚合值出队。

算法过程

预聚合窗口划分

  • 若 interval < slidingStep,预聚合窗口等于聚合窗口。
  • 若 interval >= slidingStep,预聚合窗口由各个聚合窗口的边界划分而成。

例:整体时间窗口为 [0, 32),数据如下,灰色单元格表示对应时间戳没有数据。

  • interval = slidingStep = 4,预聚合窗口即为聚合窗口。

  • interval = 4,slidingStep = 5,预聚合窗口即为聚合窗口。

  • interval = 4,slidingStep = 6,预聚合窗口即为聚合窗口。

  • interval = 4,slidingStep = 1,预聚合窗口大小为 1。

  • interval = 4,slidingStep = 2,预聚合窗口大小为 2。

  • interval = 4,slidingStep = 3,预聚合窗口大小为 3 -> 1 -> 2 -> 1 -> 2 -> ……(为方便起见,简化为 1 -> 2 -> 1 -> 2 -> ……)

可以推导出预聚合窗口关于 interval 和 slidingStep 的关系(感谢 Xiangwei Wei 贡献的精彩公式推导~):

  • 如果 interval  能被 slidingStep 整除,则窗口大小为 slidingStep
  • 否则:
    • 首先有 interval / slidingStep 向下取整个大小为 slidingStep 的窗口。
    • 后面循环出现大小为 interval % slidingStepslidingStep - interval % slidingStep 的窗口。
    • 为了方便处理,我们忽略开始的若干个大小为 slidingStep 的窗口, 将整个时间范围循环窗口大小 interval % slidingStepslidingStep - interval % slidingStep 拆分预聚合窗口。


根据预聚合结果计算聚合值

对于不同的聚合函数,按照其特点有不同的处理方式,可以分为三组:

  • SUM、COUNT、AVG:将当前聚合窗口内的预聚合值缓存在队列中,更新队列时同时更新聚合值。
  • MAX_VALUE、MIN_VALUE、EXTREME:使用单调队列算法进行优化(LEETCODE-239)。
  • FIRST_VALUE、LAST_VALUE、MAX_TIME、MIN_TIME:顺序遍历时,LAST_VALUE 和 MAX_TIME 的聚合值总出现在右边界的预聚合值,因此中间结果无需缓存。反之,逆序遍历时,FIRST_VALUE 和 MIN_TIME 的预聚合值无需缓存。

时间复杂度:O(n + m),m 为预聚合窗口的数量。

空间复杂度:O(p + k),p 为缓存的 BatchData 大小,k 为每个聚合窗口中预聚合窗口的数量。

例:整体时间窗口为 [0, 32),序列 root.sg1.d1.s1 的数据情况如下,灰色单元格表示对应时间戳没有数据,临近相同颜色的单元格表示来自同一个 BatchData。

执行:SELECT sum(s1), max_value(s1), last_value(s1), min_time(s1) FROM root.sg1.d1 GROUP BY ([0,32),8ms,2ms)

解析 SQL 后得到参数,startTime = 0,endTime = 32,interval = 8,slidingStep = 2。计算得到每个预聚合窗口大小为 2。

下面演示计算过程:

首先计算第一个预聚合窗口 [0,2) 的聚合值,将计算结果(除 LAST_VALUE 外)缓存在队列中,并更新聚合结果。

继续计算第二个预聚合窗口 [2,4) 的聚合值,更新队列和聚合结果:

  • SUM、MIN_TIME:直接入队,更新聚合结果
  • MAX_VALUE:由于 -1 < 3,将结果入队,更新聚合结果
  • LAST_VALUE:直接更新聚合结果

继续计算第三个预聚合窗口 [4,6) 的聚合值,更新队列和聚合结果:

  • SUM、MIN_TIME:直接入队,更新聚合结果
  • MAX_VALUE:由于 5 > 3 > -1,将 5 入队,3 和 -1 出队,更新聚合结果
  • LAST_VALUE:直接更新聚合结果

继续计算第四个预聚合窗口 [6,8) 的聚合值,更新队列和聚合结果:

  • SUM、MIN_TIME:直接入队,更新聚合结果
  • MAX_VALUE:由于 6 > 5,将 6 入队,5 出队,更新聚合结果
  • LAST_VALUE:直接更新聚合结果

此时得到了第一个聚合窗口的聚合结果。

聚合窗口向前滑动,将队列中不在聚合窗口范围内的预聚合值出队,并更新聚合结果。

继续计算第五个预聚合窗口 [8,10) 的聚合值,更新队列和聚合结果:

  • SUM、MIN_TIME:直接入队,更新聚合结果
  • MAX_VALUE:由于 7 > 6,将 7 入队,6 出队,更新聚合结果
  • LAST_VALUE:直接更新聚合结果

此时得到了第二个聚合窗口的聚合结果。以此类推~

实现细节

  • 时间窗口管理: 为方便管理时间窗口,可以实现一个工具类 TimeRangeIterator 迭代预聚合窗口。
  • 预聚合值计算:
    • 不带值过滤:比较简单,每个预聚合窗口的聚合值计算可以直接调用 GroupByExecutor 接口的 List<AggregateResult> calcResult(long curStartTime, long curEndTime) 得到。
    • 带值过滤:考虑梳理出一个类似于不带值过滤的计算接口。
  • 预聚合值处理:GroupByEngineDataSet 类中维护队列,完善 nextWithoutConstraint() 的执行逻辑。
  • 对于不同类型的聚合函数的处理: 实现数据结构 SlidingWindowAggrQueue,根据聚合函数类型对队列进行不同策略的管理。
  • 内存管理: 在最差情况下,队列中需要缓存窗口内全部的原始数据,对内存造成压力。这里参考 UFTD 框架 的内存管理 —— 实现 ElasticSerializableQueue 作为 SlidingWindowAggrQueue 内缓存值的队列。
  • 兼容现有 GROUP BY: interval < slidingStep,预聚合窗口等于聚合窗口,每次无需缓存,直接返回预聚合结果即可。
  • 兼容 UDAF: 在 UDAF 中,特别地,有返回一个窗口内全部原始数据的需求(如计算方差)。此时可以将预聚合窗口大小设置为 1,增加表示返回原始数据的聚合类型(如 original),此时队列中保存的就是窗口内全部的原始数据。
  • No labels

1 Comment