GroupByFill 查询的主要逻辑在 GroupByFillDataSet

  • org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSet

GroupByFill 是对原降采样结果进行填充,支持使用 IoTDB 现有的全部聚合函数和空值填充方式,使用时需要注意以下两点:

  • GroupByFill 在任何情形下都不会填充 count 的聚合结果,因为在 IoTDB 中,若某个查询区间不存在任何数据,count 的聚合结果为 0

  • GroupByFill 会分类处理 sum 的聚合结果,在 IoTDB 中,若某个查询区间不存在任何数据,sum 的聚合结果为 null,将被 GroupByFill 填充;若某个查询区间内 sum 的聚合结果恰好为 0,那么 GroupByFill 不会填充这个值

PREVIOUSUNTILLAST 与 PREVIOUS 填充的区别

Previous 填充方式的语意没有变,只要前面有值,就可以拿过来填充; PREVIOUSUNTILLAST 考虑到在某些业务场景下,所填充的值的时间不能大于该时间序列 last 的时间戳(从业务角度考虑,取历史数据不能取未来历史数据) 看下面的例子,或许更容易理解

A 点时间戳为 1,B 为 5,C 为 20,D 为 30,N 为 8,M 为 38

原始数据为

select temperature FROM root.ln.wf01.wt01 where time >= 1 and time <= 38

Timeroot.ln.wf01.wt01.temperature
121
323
525
2026
2729
2830
3040

当我们使用 Previous 插值方式时,即使 D 到 M 这一段是未来的数据,我们也会用 D 点的数据进行填充

SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[previous])

Timelast_temperature
825
1325
1826
2329
2840
3340
3840

当我们使用 PREVIOUSUNTILLAST 插值方式时,因为 D 到 M 这一段是未来的数据,我们不会进行插值,还是返回 null

SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[PREVIOUSUNTILLAST])

Timelast_temperature
825
1325
1826
2329
2840
33null
38null

第一个值与最后一个值的填充

IoTDB 的空值填充方式可以分为 PreviousFill, LinearFill, ValueFill 三大类。其中,PreviousFill 需要知道空值前的第一个非空数据,LinearFill 需要知道空值前后的第一个非空数据才能进行填充。假使某条查询语句返回的结果中第一个或最后一个值为空,就可能导致结果集在首尾存在一段连续的空值,不满足 GroupByFill 的业务期望。举例如下:

原始数据为

SELECT temperature FROM root.ln.wf01.wt01 where time >= 2017-11-07T23:49:00
Timeroot.ln.wf01.wt01.temperature
2017-11-07T23:49:00.000+08:00
23.7
2017-11-07T23:51:00.000+08:0022.24
2017-11-07T23:53:00.000+08:0024.58
2017-11-07T23:54:00.000+08:0022.52
2017-11-07T23:57:00.000+08:0024.39
2017-11-08T00:00:00.000+08:0021.07


若直接使用 Linear 插值进行填充,会得到以下结果:

SELECT last_value(temperature) FROM root.ln.wf01.wt01 GROUP BY([2017-11-07T23:50:00, 2017-11-07T23:59:00),1m) FILL (LINEAR)

Timeroot.ln.wf01.wt01.temperature
2017-11-07T23:50:00.000+08:00
null
2017-11-07T23:51:00.000+08:0022.24
2017-11-07T23:52:00.000+08:0023.41
2017-11-07T23:53:00.000+08:0024.58
2017-11-07T23:54:00.000+08:0022.52
2017-11-07T23:55:00.000+08:0023.143333
2017-11-07T23:56:00.000+08:0023.766666
2017-11-07T23:57:00.000+08:0024.39
2017-11-07T23:58:00.000+08:00null


因为结果集的第一个和最后一个区间内恰好不存在数据,所以普通的 LinearFill 无法进行填充。在 Previous 的情形下,虽然最后一个值可被填充,但第一个值仍会因为缺少数据而无法填充。为了完成这一场景的填充需求,我们扩展了 Fill 语法的 <before_range, after_range> 逻辑。若在 GroupByFill 语句中设置 before_range 或 after_range 参数,GroupByFill 将会检索查询左区间往前 before_range 以及查询右区间往后 after_range 的数据,并用于空值填充,举例如下:

SELECT last_value(temperature) FROM root.ln.wf01.wt01 GROUP BY([2017-11-07T23:50:00, 2017-11-07T23:59:00),1m) FILL (LINEAR, 5m, 5m)

Timeroot.ln.wf01.wt01.temperature
2017-11-07T23:50:00.000+08:00
22.970001
2017-11-07T23:51:00.000+08:0022.24
2017-11-07T23:52:00.000+08:0023.41
2017-11-07T23:53:00.000+08:0024.58
2017-11-07T23:54:00.000+08:0022.52
2017-11-07T23:55:00.000+08:0023.143333
2017-11-07T23:56:00.000+08:0023.766666
2017-11-07T23:57:00.000+08:0024.39
2017-11-07T23:58:00.000+08:00
23.283333



如上例所示,在指定了 before_range, after_range 参数后,GroupByFill 可以获取原始查询区间之外的数据,进而填充首尾空值

在使用 before_range, after_range 辅助填充时需注意:

  • 为了不与原有语义冲突,当不设置 before_range, after_range 参数时,GroupByFill 的空值填充取空值的前一个/后一个非空值完成;当设置 before_range, after_range 参数时,设空值所在记录的时间戳为 t,GroupByFill 取 [t-before_range, t+after_range) 内的前一个/后一个非空值完成填充

Value 填充

GroupByFill 的 ValueFill 方式会将输入的常量值均解析为字符串,填充时尝试将字符串常量转换为对应类型的数据,若转换成功则进行填充,否则就不填充。举例如下:

SELECT last_value(temperature) FROM root.ln.wf01.wt01 GROUP BY([2017-11-07T23:50:00, 2017-11-07T23:59:00),1m) FILL (20.0)

Timeroot.ln.wf01.wt01.temperature
2017-11-07T23:50:00.000+08:00
20.0
2017-11-07T23:51:00.000+08:0022.24
2017-11-07T23:52:00.000+08:0020.0
2017-11-07T23:53:00.000+08:0024.58
2017-11-07T23:54:00.000+08:0022.52
2017-11-07T23:55:00.000+08:0020.0
2017-11-07T23:56:00.000+08:0020.0
2017-11-07T23:57:00.000+08:0024.39
2017-11-07T23:58:00.000+08:00
20.0

SELECT last_value(temperature) FROM root.ln.wf01.wt01 GROUP BY([2017-11-07T23:50:00, 2017-11-07T23:59:00),1m) FILL ('temperature')

Timeroot.ln.wf01.wt01.temperature
2017-11-07T23:50:00.000+08:00
null
2017-11-07T23:51:00.000+08:0022.24
2017-11-07T23:52:00.000+08:00null
2017-11-07T23:53:00.000+08:0024.58
2017-11-07T23:54:00.000+08:0022.52
2017-11-07T23:55:00.000+08:00null
2017-11-07T23:56:00.000+08:00null
2017-11-07T23:57:00.000+08:0024.39
2017-11-07T23:58:00.000+08:00
null

核心查询逻辑

GroupByFillDataSet 中维护了三个主要变量

// the last value and time for each aggregation
private long[] previousTimes;
private Object[] previousValues;

// the next value and time for each aggregation
private List<ElasticSerializableTVList> nextTVLists;
  • previousTimes: 当前聚合算子的前一个非空值所在时间
  • previousValues: 当前聚合算子的前一个非空值
  • nextTVLists: 当前聚合算子的后一个非空值和时间

GroupByFillDataSet.initCache 对这三个变量进行了初始化

while (cacheSet.cardinality() < aggregations.size() && dataSet.hasNextWithoutConstraint()) {
  RowRecord record = dataSet.nextWithoutConstraint();
  long timestamp = record.getTimestamp();
  List<Field> fields = record.getFields();
  for (int i = 0; i < fields.size(); i++) {
    Field field = fields.get(i);
    if (field == null) {
      continue;
    }

    if (ascending && timestamp < startTime) {
      previousTimes[i] = timestamp;
      previousValues[i] = field.getObjectValue(resultDataType[i]);
    } else if (!ascending && timestamp >= endTime) {
      previousTimes[i] = timestamp;
      previousValues[i] = field.getObjectValue(resultDataType[i]);
    } else {
      nextTVLists.get(i).put(timestamp, field.getObjectValue(resultDataType[i]));
      cacheSet.set(i);
    }
  }
}

GroupByFillDataSet.initCache 不断提取行记录并缓存于 nextTVLists,直到所有聚合结果都找到第一个非空值。nextTVLists 使用 ElasticSerializableTVList 实现,是使用 LRU 算法维护的可进行外存交换的动态列表,默认每列使用 1MB 缓存空间,多余的缓存数据会直接存在外存中。每列的缓存容量可以通过设置参数 groupByFillCacheSizeInMB 改变。

填充过程

填充过程在nextWithoutConstraint方法中完成,主要逻辑如下:

public RowRecord nextWithoutConstraint() throws IOException {
  if (!hasCachedTimeInterval) {
    throw new IOException(
        "need to call hasNext() before calling next() " + "in GroupByFillDataSet.");
  }

  hasCachedTimeInterval = false;
  RowRecord record;
  long curTimestamp;
  // 设置记录对应的时间戳
  if (leftCRightO) {
    curTimestamp = curStartTime;
    record = new RowRecord(curStartTime);
  } else {
    curTimestamp = curEndTime - 1;
    record = new RowRecord(curEndTime - 1);
  }

  for (int i = 0; i < aggregations.size(); i++) {
    // 若当前列没有后继值,直接进入空值填充过程
    if (nextTVLists.get(i).size() == nextIndices[i]) {
      fillRecord(i, record);
      continue;
    }

	// 提取当前列的第一个非空记录时间
    long cacheTime = nextTVLists.get(i).getTime(nextIndices[i]);
    if (cacheTime == curTimestamp) {
	  // 若该时间等于当前记录时间,则该值可以直接返回
      record.addField(getNextCacheValue(i), resultDataType[i]);
    } else {
      // 否则进入空值填充过程
      fillRecord(i, record);
    }
  }

  try {
    // 移动各列缓存
    slideCache(record.getTimestamp());
  } catch (QueryProcessException e) {
    logger.warn("group by fill has an exception while sliding: ", e);
  }

  return record;
}


  • No labels