GroupByFill 查询的主要逻辑在 GroupByFillDataSet
org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSet
GroupByFill 是对原降采样结果进行填充,支持使用 IoTDB 现有的全部聚合函数和空值填充方式,使用时需要注意以下两点:
GroupByFill 在任何情形下都不会填充 count 的聚合结果,因为在 IoTDB 中,若某个查询区间不存在任何数据,count 的聚合结果为 0
GroupByFill 会分类处理 sum 的聚合结果,在 IoTDB 中,若某个查询区间不存在任何数据,sum 的聚合结果为 null,将被 GroupByFill 填充;若某个查询区间内 sum 的聚合结果恰好为 0,那么 GroupByFill 不会填充这个值
PREVIOUSUNTILLAST 与 PREVIOUS 填充的区别
Previous 填充方式的语意没有变,只要前面有值,就可以拿过来填充; PREVIOUSUNTILLAST 考虑到在某些业务场景下,所填充的值的时间不能大于该时间序列 last 的时间戳(从业务角度考虑,取历史数据不能取未来历史数据) 看下面的例子,或许更容易理解
A 点时间戳为 1,B 为 5,C 为 20,D 为 30,N 为 8,M 为 38
原始数据为
select temperature FROM root.ln.wf01.wt01 where time >= 1 and time <= 38
Time | root.ln.wf01.wt01.temperature |
---|---|
1 | 21 |
3 | 23 |
5 | 25 |
20 | 26 |
27 | 29 |
28 | 30 |
30 | 40 |
当我们使用 Previous 插值方式时,即使 D 到 M 这一段是未来的数据,我们也会用 D 点的数据进行填充
SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[previous])
Time | last_temperature |
---|---|
8 | 25 |
13 | 25 |
18 | 26 |
23 | 29 |
28 | 40 |
33 | 40 |
38 | 40 |
当我们使用 PREVIOUSUNTILLAST
插值方式时,因为 D 到 M 这一段是未来的数据,我们不会进行插值,还是返回 null
SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[PREVIOUSUNTILLAST])
Time | last_temperature |
---|---|
8 | 25 |
13 | 25 |
18 | 26 |
23 | 29 |
28 | 40 |
33 | null |
38 | null |
第一个值与最后一个值的填充
IoTDB 的空值填充方式可以分为 PreviousFill, LinearFill, ValueFill 三大类。其中,PreviousFill 需要知道空值前的第一个非空数据,LinearFill 需要知道空值前后的第一个非空数据才能进行填充。假使某条查询语句返回的结果中第一个或最后一个值为空,就可能导致结果集在首尾存在一段连续的空值,不满足 GroupByFill 的业务期望。举例如下:
原始数据为
SELECT temperature FROM root.ln.wf01.wt01 where time >= 2017-11-07T23:49:00
Time | root.ln.wf01.wt01.temperature |
---|---|
|
|
2017-11-07T23:51:00.000+08:00 | 22.24 |
2017-11-07T23:53:00.000+08:00 | 24.58 |
2017-11-07T23:54:00.000+08:00 | 22.52 |
2017-11-07T23:57:00.000+08:00 | 24.39 |
2017-11-08T00:00:00.000+08:00 | 21.07 |
若直接使用 Linear 插值进行填充,会得到以下结果:
SELECT last_value(temperature) FROM root.ln.wf01.wt01 GROUP BY([2017-11-07T23:50:00, 2017-11-07T23:59:00),1m) FILL (LINEAR)
Time | root.ln.wf01.wt01.temperature |
---|---|
|
|
2017-11-07T23:51:00.000+08:00 | 22.24 |
2017-11-07T23:52:00.000+08:00 | 23.41 |
2017-11-07T23:53:00.000+08:00 | 24.58 |
2017-11-07T23:54:00.000+08:00 | 22.52 |
2017-11-07T23:55:00.000+08:00 | 23.143333 |
2017-11-07T23:56:00.000+08:00 | 23.766666 |
2017-11-07T23:57:00.000+08:00 | 24.39 |
2017-11-07T23:58:00.000+08:00 | null |
因为结果集的第一个和最后一个区间内恰好不存在数据,所以普通的 LinearFill 无法进行填充。在 Previous 的情形下,虽然最后一个值可被填充,但第一个值仍会因为缺少数据而无法填充。为了完成这一场景的填充需求,我们扩展了 Fill 语法的 <before_range, after_range> 逻辑。若在 GroupByFill 语句中设置 before_range 或 after_range 参数,GroupByFill 将会检索查询左区间往前 before_range 以及查询右区间往后 after_range 的数据,并用于空值填充,举例如下:
SELECT last_value(temperature) FROM root.ln.wf01.wt01 GROUP BY([2017-11-07T23:50:00, 2017-11-07T23:59:00),1m) FILL (LINEAR, 5m, 5m)
Time | root.ln.wf01.wt01.temperature |
---|---|
|
|
2017-11-07T23:51:00.000+08:00 | 22.24 |
2017-11-07T23:52:00.000+08:00 | 23.41 |
2017-11-07T23:53:00.000+08:00 | 24.58 |
2017-11-07T23:54:00.000+08:00 | 22.52 |
2017-11-07T23:55:00.000+08:00 | 23.143333 |
2017-11-07T23:56:00.000+08:00 | 23.766666 |
2017-11-07T23:57:00.000+08:00 | 24.39 |
2017-11-07T23:58:00.000+08:00 |
|
如上例所示,在指定了 before_range, after_range 参数后,GroupByFill 可以获取原始查询区间之外的数据,进而填充首尾空值
在使用 before_range, after_range 辅助填充时需注意:
- 为了不与原有语义冲突,当不设置 before_range, after_range 参数时,GroupByFill 的空值填充取空值的前一个/后一个非空值完成;当设置 before_range, after_range 参数时,设空值所在记录的时间戳为 t,GroupByFill 取 [t-before_range, t+after_range) 内的前一个/后一个非空值完成填充
Value 填充
GroupByFill 的 ValueFill 方式会将输入的常量值均解析为字符串,填充时尝试将字符串常量转换为对应类型的数据,若转换成功则进行填充,否则就不填充。举例如下:
SELECT last_value(temperature) FROM root.ln.wf01.wt01 GROUP BY([2017-11-07T23:50:00, 2017-11-07T23:59:00),1m) FILL (20.0)
Time | root.ln.wf01.wt01.temperature |
---|---|
|
|
2017-11-07T23:51:00.000+08:00 | 22.24 |
2017-11-07T23:52:00.000+08:00 | 20.0 |
2017-11-07T23:53:00.000+08:00 | 24.58 |
2017-11-07T23:54:00.000+08:00 | 22.52 |
2017-11-07T23:55:00.000+08:00 | 20.0 |
2017-11-07T23:56:00.000+08:00 | 20.0 |
2017-11-07T23:57:00.000+08:00 | 24.39 |
2017-11-07T23:58:00.000+08:00 |
|
SELECT last_value(temperature) FROM root.ln.wf01.wt01 GROUP BY([2017-11-07T23:50:00, 2017-11-07T23:59:00),1m) FILL ('temperature')
Time | root.ln.wf01.wt01.temperature |
---|---|
|
|
2017-11-07T23:51:00.000+08:00 | 22.24 |
2017-11-07T23:52:00.000+08:00 | null |
2017-11-07T23:53:00.000+08:00 | 24.58 |
2017-11-07T23:54:00.000+08:00 | 22.52 |
2017-11-07T23:55:00.000+08:00 | null |
2017-11-07T23:56:00.000+08:00 | null |
2017-11-07T23:57:00.000+08:00 | 24.39 |
2017-11-07T23:58:00.000+08:00 |
|
核心查询逻辑
在 GroupByFillDataSet
中维护了三个主要变量
// the last value and time for each aggregation private long[] previousTimes; private Object[] previousValues; // the next value and time for each aggregation private List<ElasticSerializableTVList> nextTVLists;
- previousTimes: 当前聚合算子的前一个非空值所在时间
- previousValues: 当前聚合算子的前一个非空值
- nextTVLists: 当前聚合算子的后一个非空值和时间
在 GroupByFillDataSet.initCache
对这三个变量进行了初始化
while (cacheSet.cardinality() < aggregations.size() && dataSet.hasNextWithoutConstraint()) { RowRecord record = dataSet.nextWithoutConstraint(); long timestamp = record.getTimestamp(); List<Field> fields = record.getFields(); for (int i = 0; i < fields.size(); i++) { Field field = fields.get(i); if (field == null) { continue; } if (ascending && timestamp < startTime) { previousTimes[i] = timestamp; previousValues[i] = field.getObjectValue(resultDataType[i]); } else if (!ascending && timestamp >= endTime) { previousTimes[i] = timestamp; previousValues[i] = field.getObjectValue(resultDataType[i]); } else { nextTVLists.get(i).put(timestamp, field.getObjectValue(resultDataType[i])); cacheSet.set(i); } } }
GroupByFillDataSet.initCache
不断提取行记录并缓存于 nextTVLists,直到所有聚合结果都找到第一个非空值。nextTVLists 使用 ElasticSerializableTVList 实现,是使用 LRU 算法维护的可进行外存交换的动态列表,默认每列使用 1MB 缓存空间,多余的缓存数据会直接存在外存中。每列的缓存容量可以通过设置参数 groupByFillCacheSizeInMB 改变。
填充过程
填充过程在nextWithoutConstraint
方法中完成,主要逻辑如下:
public RowRecord nextWithoutConstraint() throws IOException { if (!hasCachedTimeInterval) { throw new IOException( "need to call hasNext() before calling next() " + "in GroupByFillDataSet."); } hasCachedTimeInterval = false; RowRecord record; long curTimestamp; // 设置记录对应的时间戳 if (leftCRightO) { curTimestamp = curStartTime; record = new RowRecord(curStartTime); } else { curTimestamp = curEndTime - 1; record = new RowRecord(curEndTime - 1); } for (int i = 0; i < aggregations.size(); i++) { // 若当前列没有后继值,直接进入空值填充过程 if (nextTVLists.get(i).size() == nextIndices[i]) { fillRecord(i, record); continue; } // 提取当前列的第一个非空记录时间 long cacheTime = nextTVLists.get(i).getTime(nextIndices[i]); if (cacheTime == curTimestamp) { // 若该时间等于当前记录时间,则该值可以直接返回 record.addField(getNextCacheValue(i), resultDataType[i]); } else { // 否则进入空值填充过程 fillRecord(i, record); } } try { // 移动各列缓存 slideCache(record.getTimestamp()); } catch (QueryProcessException e) { logger.warn("group by fill has an exception while sliding: ", e); } return record; }