Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

背景

目前,除了不带值过滤的原始数据查询,对于 Vector 时间序列的处理,是将其转化到分量然后当做普通的时间序列处理,即如果查了 Vector 时间序列的 K 个分量,则需要建 K 个 Reader 进行 K 次查询,因此我们接下来的优化是,将属于同一个 Vector 时间序列的 K 个分量聚合到一起进行查询,即只进行一次查询将所有的分量数据查出来。在聚合上,则涉及到如果使用多个分量的统计信息,以及如何将 BatchData 内的多个分量值列,映射到各个分量进行更新。


实现方案

上层,首先我们需要一个 Map<String, List<Integer>> subPathToAggrIndexMap, 表明每个分量在结果集中的位置。

例如 select count(s1), count(s2), max_value(s1), min_value(s2) from root.sg.d1.vector1

...

从底层来看,没有值过滤条件的聚合为每一条时间序列创建一个 SeriesAggregateReader,然后根据该 Reader 判断是否利用统计信息更新,包括 timeSeriesMetaData Statistics, ChunkMetaData statistics, 和 Page Statistics,涉及接口分别为

...

或者利用数据值更新,即利用 batchData 进行更新,涉及接口分别为 `void updateResultFromPageData(BatchData dataInThisPage)`


由于一条多元时间序列的每个 由于一条 Vector 时间序列的每个 MetaData 内都包含了多个统计信息,且得到的 BatchData 也包含一个时间列和多个值列,因此我们需要指定需要哪一个分量的统计信息,以及需要 BatchData 里的第几列作为值列。


这里 SeriesAggregateReader 实现了 IAggregateReader 接口,接口方法抽象的不错,因此可以直接实现一个新的类 VectorSeriesAggregateReader,实现其接口方法“,实现其接口方法


public interface IAggregateReader {

boolean canUseCurrentFileStatistics() throws IOException;

Statistics currentFileStatistics() throws IOException;

boolean canUseCurrentChunkStatistics() throws IOException;

Statistics currentChunkStatistics() throws IOException;

/** only be used without value filter */
boolean canUseCurrentPageStatistics() throws IOException;

/** only be used without value filter */
Statistics currentPageStatistics() throws IOException;

...

...

public Statistics getStatistics(int index) {
return valueTimeseriesMetadataList.get(index).getStatistics();
}


并在外层控制游标:

while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) {
Statistics statistics = getStatistics(seriesReader.getCurIndex());

updateUsingStatistics(statistics);
seriesReader.nextIndex();
}


注意这里要把聚合结果集 aggregateResultList 按分量进行分组,保证每次使用 statistics 更新的都是属于该分量的聚合结果。

因此在上层,我们需要一个 Map<String, List<Integer>> subPathToAggrIndexMap, 表明每个分量在结果集中的位置。

例如 select count(s1), count(s2), max_value(s1), min_value(s2) from root.sg.d1.vector1

则会聚合为一条路径,root.sg.d1.vector1[s1, s2] 和一个 Map, (s1 →  [0, 2], s2 → [1,3]),然后为每个 Map Entry 分组建立聚合结果集。



对于利用数据值更新,为了最大程度的利用现有接口,使改动最小化,选择保持 void updateResultFromPageData(BatchData dataInThisPage) 接口不变,
而将一个 VectorBatchData 拆解为多个分量的 subBatchData, 然后利用现有接口进行后续操作。

这样改动也可以直接利用上面根据分量进行的分组进行操作。
BatchData nextOverlappedPageData = seriesReader.nextPage();
BatchData[] subBatchData = nextOverlappedPageData.generateSubBatchData();
while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) {
updateUsingSubBatchData(SubAggrResultList, subBatchData)
seriesReader.nextIndex();
}