设计原理

IoTDB server 模块共提供 3 种不同形式的针对单个时间序列的读取接口,以支持不同形式的查询。

  • 原始数据查询接口,返回 BatchData,可带时间过滤条件或值过滤条件,两种过滤不可同时存在。

  • 聚合查询接口 (主要用于聚合查询和降采样查询)

  • 按递增时间戳查询对应值的接口(主要用于带值过滤的查询)

相关接口

以上三种读取单个时间序列数据的方式对应代码里的三个接口

org.apache.iotdb.tsfile.read.reader.IBatchReader

主要方法

// 判断是否还有 BatchData
boolean hasNextBatch() throws IOException;

// 获得下一个 BatchData,并把游标后移
BatchData nextBatch() throws IOException;

使用流程

while (batchReader.hasNextBatch()) {
BatchData batchData = batchReader.nextBatch();

// use batchData to do some work
...
}

org.apache.iotdb.db.query.reader.series.IAggregateReader

主要方法

// 判断是否还有 Chunk
boolean hasNextChunk() throws IOException;

// 判断是否能够使用当前 Chunk 的统计信息
boolean canUseCurrentChunkStatistics();

// 获得当前 Chunk 的统计信息
Statistics currentChunkStatistics();

// 跳过当前 Chunk
void skipCurrentChunk();

// 判断当前 Chunk 是否还有下一个 Page
boolean hasNextPage() throws IOException;

// 判断能否使用当前 Page 的统计信息
boolean canUseCurrentPageStatistics() throws IOException;

// 获得当前 Page 的统计信息
Statistics currentPageStatistics() throws IOException;

// 跳过当前的 Page
void skipCurrentPage();

// 获得当前 Page 的数据
BatchData nextPage() throws IOException;

一般使用流程

while (aggregateReader.hasNextChunk()) {
if (aggregateReader.canUseCurrentChunkStatistics()) {
  Statistics chunkStatistics = aggregateReader.currentChunkStatistics();
   
  // 用 chunk 层的统计信息计算
  ...
   
  aggregateReader.skipCurrentChunk();
  continue;
}
 
// 把当前 chunk 中的 page 消耗完
while (aggregateReader.hasNextPage()) {
if (aggregateReader.canUseCurrentPageStatistics()) {
  // 可以用统计信息
  Statistics pageStatistic = aggregateReader.currentPageStatistics();
 
  // 用 page 层的统计信息计算
  ...
 
  aggregateReader.skipCurrentPage();
  continue;
} else {
  // 不能用统计信息,需要用数据计算
  BatchData batchData = aggregateReader.nextOverlappedPage();
 
  // 用 batchData 计算
  ...
}
}
}

org.apache.iotdb.db.query.reader.IReaderByTimestamp

主要方法

// 给定一批递增时间戳的值,返回一批结果(减少方法调用次数)
Object[] getValuesInTimestamps(long[] timestamps, int length) throws IOException;

一般使用流程

该接口在带值过滤的查询中被使用,TimeGenerator 生成时间戳后,使用该接口获得该时间戳对应的 value

Object[] values = readerByTimestamp.getValueInTimestamp(timestamps, length);

具体实现类

上述三个接口都有其对应的实现类,由于以上三种查询有共性,我们设计了一个基础的 SeriesReader 工具类,封装了对于一个时间序列读取操作的基本方法,帮助实现以上三种接口。下面首先介绍 SeriesReader 的设计原理,然后再依次介绍三个接口的具体实现。

org.apache.iotdb.db.query.reader.series.SeriesReader

设计思想

背景知识:TsFile 文件(TsFileResource)解开后可以得到 ChunkMetadata,ChunkMetadata 解开后可以得到一堆 PageReader,PageReader 可以直接返回 BatchData 数据点。

为了支持以上三种接口

数据按照粒度从大到小分成四种:文件,Chunk,Page,相交数据点。在原始数据查询中,最大的数据块返回粒度是一个 page,如果一个 page 和其他 page 由于乱序写入相互覆盖了,就解开成数据点做合并。聚合查询中优先使用 Chunk 的统计信息,其次是 Page 的统计信息,最后是相交数据点。

设计原则是能用粒度大的就不用粒度小的。

首先介绍一下 SeriesReader 里的几个重要字段


/*
* 文件层
*/
private final List<TsFileResource> seqFileResource;
顺序文件列表,因为顺序文件本身就保证有序,且时间戳互不重叠,只需使用 List 进行存储

private final PriorityQueue<TsFileResource> unseqFileResource;
乱序文件列表,因为乱序文件互相之间不保证顺序性,且可能有重叠,为了保证顺序,使用优先队列进行存储

/*
* chunk 层
*
* 三个字段之间数据永远不重复,first 永远是第一个(开始时间最小)
*/
private ChunkMetaData firstChunkMetaData;
填充 chunk 层时优先填充此字段,保证这个 chunk 具有当前最小开始时间

private final List<ChunkMetaData> seqChunkMetadatas;
顺序文件解开后得到的 ChunkMetaData 存放在此,本身有序且互不重叠,所以使用 List 存储

private final PriorityQueue<ChunkMetaData> unseqChunkMetadatas;
乱序文件解开后得到的 ChunkMetaData 存放在此,互相之间可能有重叠,为了保证顺序,使用优先队列进行存储

/*
* page 层
*
* 两个字段之间数据永远不重复,first 永远是第一个(开始时间最小)
*/
private VersionPageReader firstPageReader;
开始时间最小的 page reader

private PriorityQueue<VersionPageReader> cachedPageReaders;
当前获得的所有 page reader,按照每个 page 的起始时间进行排序

/*
* 相交数据点层
*/
private PriorityMergeReader mergeReader;
本质上是多个带优先级的 page,按时间戳从低到高输出数据点,时间戳相同时,保留优先级高的

/*
* 相交数据点产出结果的缓存
*/
private boolean hasCachedNextOverlappedPage;
是否缓存了下一个 batch

private BatchData cachedBatchData;
缓存的下一个 batch 的引用

下面介绍一下 SeriesReader 里的重要方法

hasNextChunk()

  • 主要功能:判断该时间序列还有没有下一个 chunk。

  • 约束:在调用这个方法前,需要保证 SeriesReader 内已经没有 page 和 数据点 层级的数据了,也就是之前解开的 chunk 都消耗完了。

  • 实现:如果 firstChunkMetaData 不为空,则代表当前已经缓存了第一个 ChunkMetaData,且未被使用,直接返回true

    尝试去解开第一个顺序文件和第一个乱序文件,填充 chunk 层。并解开与 firstChunkMetadata 相重合的所有文件。

isChunkOverlapped()

  • 主要功能:判断当前的 chunk 有没有与其他 Chunk 有重叠

  • 约束:在调用这个方法前,需要保证 chunk 层已经缓存了 firstChunkMetadata,也就是调用了 hasNextChunk() 并且为 true。

  • 实现:直接把 firstChunkMetadataseqChunkMetadatasunseqChunkMetadatas 相比较。因为此前已经保证所有和 firstChunkMetadata 相交的文件都被解开了。

currentChunkStatistics()

返回 firstChunkMetaData 的统计信息。

skipCurrentChunk()

跳过当前 chunk。只需要将firstChunkMetaData置为null

hasNextPage()

  • 主要功能:判断 SeriesReader 中还有没有已经解开的 page,如果有相交的 page,就构造 cachedBatchData 并缓存,否则缓存 firstPageReader

  • 实现:如果已经缓存了 cachedBatchData 就直接返回。如果有相交数据点,就构造 cachedBatchData。如果已经缓存了 firstPageReader,就直接返回。

    如果当前的 firstChunkMetadata 还没有解开,就解开与之重叠的所有 ChunkMetadata,构造 firstPageReader。

    判断,如果 firstPageReadercachedPageReaders 相交,则构造 cachedBatchData,否则直接返回。

isPageOverlapped()

  • 主要功能:判断当前的 page 有没有与其他 page 有重叠

  • 约束:在调用这个方法前,需要保证调用了 hasNextPage() 并且为 true。也就是,有可能缓存了一个相交的 cachedBatchData,或者缓存了不相交的 firstPageReader

  • 实现:先判断有没有 cachedBatchData,如果没有,就说明当前 page 不相交,则 mergeReader 里没数据。再判断 firstPageReader 是否与 cachedPageReaders 中的 page 相交。

currentPageStatistics()

返回 firstPageReader 的统计信息。

skipCurrentPage()

跳过当前 Page。只需要将 firstPageReader 置为 null。

nextPage()

  • 主要功能:返回下一个相交或不相交的 page

  • 约束:在调用这个方法前,需要保证调用了 hasNextPage() 并且为 true。也就是,有可能缓存了一个相交的 cachedBatchData,或者缓存了不相交的 firstPageReader

  • 实现:如果 hasCachedNextOverlappedPage 为 true,说明缓存了一个相交的 page,直接返回 cachedBatchData。否则当前 page 不相交,直接从 firstPageReader 里拿当前 page 的数据。

hasNextOverlappedPage()

  • 主要功能:内部方法,用来判断当前有没有重叠的数据,并且构造相交的 page 并缓存。

  • 实现:如果 hasCachedNextOverlappedPagetrue,直接返回 true

    否则,先调用 tryToPutAllDirectlyOverlappedPageReadersIntoMergeReader() 方法,将 cachedPageReaders 中所有与 firstPageReader 有重叠的放进 mergeReader 里。mergeReader 里维护了一个 currentLargestEndTime 变量,每次添加进新的 Reader 时被更新,用以记录当前添加进 mergeReader 的 page 的最大结束时间。 然后先从mergeReader里取出当前最大的结束时间,作为第一批数据的结束时间,记为currentPageEndTime。接着去遍历mergeReader,直到当前的时间戳大于currentPageEndTime

    每从 mergeReader 移出一个点前,我们先要判断是否有与当前时间戳重叠的 file 或者 chunk 或者 page(这里之所以还要再做一次判断,是因为,比如当前 page 是 1-30,和他直接相交的 page 是 20-50,还有一个 page 是 40-60,每取一个点判断一次是想把 40-60 解开),如果有,解开相应的 file 或者 chunk 或者 page,并将其放入mergeReader。完成重叠的判断后,从mergeReader中取出相应数据。

    完成迭代后将获得数据缓存在 cachedBatchData 中,并将 hasCachedNextOverlappedPage 置为 true

nextOverlappedPage()

将缓存的cachedBatchData返回,并将hasCachedNextOverlappedPage置为false

org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader

SeriesRawDataBatchReader实现了IBatchReader

其方法hasNextBatch()的核心判断流程是

// 有缓存了 batch,直接返回
if (hasCachedBatchData) {
return true;
}

/*
* 如果 SeriesReader 里还有 page,返回 page
*/
if (readPageData()) {
hasCachedBatchData = true;
return true;
}

/*
* 如果 SeriesReader 里还有 chunk,返回 chunk
*/
if (readChunkData()) {
hasCachedBatchData = true;
return true;
}

/*
* 如果有 File,并且有 chunk,返回 chunk
*/
while (seriesReader.hasNextFile()) {
if (readChunkData()) {
  hasCachedBatchData = true;
  return true;
}
}
return hasCachedBatchData;

org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp

SeriesReaderByTimestamp 实现了 IReaderByTimestamp

设计思想:当给一个时间戳要查询值时,这个时间戳可以转化为一个 time >= x 的过滤条件。不断更新这个过滤条件,并且跳过不满足的文件,chunk 和 page。

实现方式:

/*
* 优先判断下一个 page 有没有当前所查时间,能跳过就跳过
*/
if (readPageData(timestamp)) {
return true;
}

/*
* 判断下一个 chunk 有没有当前所查时间,能跳过就跳过
*/
if (readChunkData(timestamp)) {
return true;
}

/*
* 判断下一个 File 有没有当前所查时间,能跳过就跳过
*/
while (seriesReader.hasNextFile()) {
Statistics statistics = seriesReader.currentChunkStatistics();
if (!satisfyTimeFilter(statistics)) {
  seriesReader.skipCurrentChunk();
  continue;
}
/*
  * chunk 不能跳过,继续到 chunk 里检查 page
  */
if (readChunkData(timestamp)) {
  return true;
}
}
return false;

org.apache.iotdb.db.query.reader.series.SeriesAggregateReader

SeriesAggregateReader 实现了 IAggregateReader

IAggregateReader的大部分接口方法都在SeriesReader有对应实现,除了canUseCurrentChunkStatistics()canUseCurrentPageStatistics()两个方法。

canUseCurrentChunkStatistics()

设计思想:可以用统计信息的条件是当前 chunk 不重叠,并且满足过滤条件。

先调用SeriesReadercurrentChunkStatistics()方法,获得当前 chunk 的统计信息,再调用SeriesReaderisChunkOverlapped()方法判断当前 chunk 是否重叠,如果当前 chunk 不重叠,且其统计信息满足过滤条件,则返回true,否则返回false

canUseCurrentPageStatistics()

设计思想:可以用统计信息的条件是当前 page 不重叠,并且满足过滤条件。

先调用SeriesReadercurrentPageStatistics() 方法,获得当前 page 的统计信息,再调用SeriesReaderisPageOverlapped() 方法判断当前 page 是否重叠,如果当前 page 不重叠,且其统计信息满足过滤条件,则返回true,否则返回false

  • No labels