空值填充的主要逻辑在 FillQueryExecutor

  • org.apache.iotdb.db.query.executor.FillQueryExecutor

IoTDB 中支持两种填充方式,Previous 填充和 Linear 填充。

Previous 填充

Previous 填充是使用离查询时间戳最近且小于等于查询时间戳的值来进行填充的一种方式,下图展示了一种可能的填充情况。

                                     |
+-------------------------+         |
|     seq files           |         |
+-------------------------+         |
                                    |
          +---------------------------+
          |     unseq files       | |
          +---------------------------+
                                    |
                                queryTime

设计原理

在实际生产场景中,可能需要在大量的顺序和乱序 TsFile 中查找可用的填充值,因此解开大量 TsFile 文件会成为性能瓶颈。我们设计的思路在于尽可能的减少需要解开 tsFile 文件的个数,从而加速找到填充值这一过程。

顺序文件查找"最近点"

首先在顺序文件中查询满足条件的"最近点"。由于所有顺序文件及其所有包含的 chunk 数据都已经按照顺序排列,因此这个"最近点"可以通过retrieveValidLastPointFromSeqFiles()方法很容易找到。

这个找到的"最近点"是可以成为 Previous 填充的最终值的,因此它在接下来的搜索中能够被用来作为下界来寻找其他更好的的填充值。接下来只有那些时间戳大于这个下界的点才是 Previous 填充的结果值。

下例中的 unseq 文件由于其最大值没有超过"最近点",因此不需要再进一步搜索。

                        last point
                            |       |
          +---------------------------+
          |   seq file     |       | |
          +---------------------------+
+-------------------------+ |       |
|     unseq file         | |       |
+-------------------------+ |       |
                            |       |
                                    |
                              queryTime

筛选乱序文件

筛选并解开乱序文件的逻辑在成员函数UnpackOverlappedUnseqFiles(long lBoundTime)中,该方法接受一个下界参数,将满足要求的乱序文件解开且把它们的 TimeseriesMetadata 结构存入unseqTimeseriesMetadataList中。

UnpackOverlappedUnseqFiles(long lBoundTime)方法首先用 lBoundTime排除掉不满足要求的乱序文件,剩下被筛选后的乱序文件都一定会包含queryTime时间戳,即 startTime <= queryTime and queryTime <= endTime。如果乱序文件的 start time 大于lBoundTime,更新lBoundTime

下图中,case 1 中的lBoundTime将会被更新,而 case 2 中的则不会被更新。

case1           case2
|             |                     |
|         +---------------------------+
|         |   |   unseq file       | |
|         +---------------------------+
|             |                     |
lBoundTime   lBoundTime           queryTime
while (!unseqFileResource.isEmpty()) {
// The very end time of unseq files is smaller than lBoundTime,
// then skip all the rest unseq files
if (unseqFileResource.peek().getEndTimeMap().get(seriesPath.getDevice()) < lBoundTime) {
  return;
}
TimeseriesMetadata timeseriesMetadata =
    FileLoaderUtils.loadTimeSeriesMetadata(
        unseqFileResource.poll(), seriesPath, context, timeFilter, allSensors);
if (timeseriesMetadata != null && timeseriesMetadata.getStatistics().canUseStatistics()
    && lBoundTime <= timeseriesMetadata.getStatistics().getEndTime()) {
  lBoundTime = Math.max(lBoundTime, timeseriesMetadata.getStatistics().getStartTime());
  unseqTimeseriesMetadataList.add(timeseriesMetadata);
  break;
}
}

接下来我们可以使用得到的 Timeseries Metadata 来查找所有其他重叠的乱序文件。需要注意的依旧是当一个重叠的乱序文件被找到后,需要按情况更新lBoundTime

while (!unseqFileResource.isEmpty()
      && (lBoundTime <= unseqFileResource.peek().getEndTimeMap().get(seriesPath.getDevice()))) {
TimeseriesMetadata timeseriesMetadata =
    FileLoaderUtils.loadTimeSeriesMetadata(
        unseqFileResource.poll(), seriesPath, context, timeFilter, allSensors);
unseqTimeseriesMetadataList.add(timeseriesMetadata);
// update lBoundTime if current unseq timeseriesMetadata's last point is a valid result
if (timeseriesMetadata.getStatistics().canUseStatistics()
    && endtimeContainedByTimeFilter(timeseriesMetadata.getStatistics())) {
  lBoundTime = Math.max(lBoundTime, timeseriesMetadata.getStatistics().getEndTime());
}
}

组合最终结果

最终结果在方法getFillResult() 中生成。

public TimeValuePair getFillResult() throws IOException {
  TimeValuePair lastPointResult = retrieveValidLastPointFromSeqFiles();
  UnpackOverlappedUnseqFiles(lastPointResult.getTimestamp());
   
  long lastVersion = 0;
  PriorityQueue<ChunkMetadata> sortedChunkMetatdataList = sortUnseqChunkMetadatasByEndtime();
  while (!sortedChunkMetatdataList.isEmpty()
      && lastPointResult.getTimestamp() <= sortedChunkMetatdataList.peek().getEndTime()) {
    ChunkMetadata chunkMetadata = sortedChunkMetatdataList.poll();
    TimeValuePair lastChunkPoint = getChunkLastPoint(chunkMetadata);
    if (shouldUpdate(lastPointResult.getTimestamp(), lastVersion,
        lastChunkPoint.getTimestamp(), chunkMetadata.getVersion())) {
      lastPointResult = lastChunkPoint;
      lastVersion = chunkMetadata.getVersion();
    }
  }
  return lastPointResult;
}

Linear 填充

对于 T 时间的 Linear Fill 线性填充值是由该时间序列的两个相关值做线性拟合得到的:T 之前的最近时间戳对应的值,T 之后的最早时间戳对应的值。 基于这种特点,线性填充只能被应用于数字类型如:int, double, float。

计算前时间值

前时间值使用与 Previous Fill 中相同方式计算。

计算后时间值

后时间值使用聚合运算中的"MIN_TIME"和"FIRST_VALUE",分别计算出 T 时间之后最近的时间戳和对应的值,并组成 time-value 对返回。

  • No labels