Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


上层,首先我们需要一个 Map<String, List<Integer>> subPathToAggrIndexMap, 表明每个分量在结果集中的位置。

例如 select count(s1), count(s2), max_value(s1), min_value(s2) from root.sg.d1.vector1

则会聚合为一条路径,root.sg.d1.vector1[s1, s2] 和一个 Map, (s1 →  [0, 2], s1 → [1], s2→ [2])

从底层来看,没有值过滤条件的聚合为每一条时间序列创建一个 SeriesAggregateReader,然后根据该 Reader 判断是否利用统计信息更新,包括 timeSeriesMetaData Statistics, ChunkMetaData statistics, 和 Page Statistics,涉及接口分别为

...

或者利用数据值更新,即利用 batchData 进行更新,涉及接口分别为 `void updateResultFromPageData(BatchData dataInThisPage)`


由于一条多元时间序列的每个 由于一条多元时间序列的每个 MetaData 内都包含了多个统计信息,且得到的 BatchData 也包含一个时间列和多个值列,因此我们需要指定需要哪一个分量的统计信息,以及需要 BatchData 里的第几列作为值列。


这里 SeriesAggregateReader 实现了 IAggregateReader 接口,接口方法抽象的不错,因此可以直接实现一个新的类 VectorSeriesAggregateReader,实现其接口方法“,实现其接口方法


public interface IAggregateReader {

boolean canUseCurrentFileStatistics() throws IOException;

Statistics currentFileStatistics() throws IOException;

boolean canUseCurrentChunkStatistics() throws IOException;

Statistics currentChunkStatistics() throws IOException;

/** only be used without value filter */
boolean canUseCurrentPageStatistics() throws IOException;

/** only be used without value filter */
Statistics currentPageStatistics() throws IOException;

...

...

public Statistics getStatistics(int index) {
return valueTimeseriesMetadataList.get(index).getStatistics();
}


并在外层控制游标:

while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) {
Statistics statistics = getStatistics(seriesReader.getCurIndex());

updateUsingStatistics(statistics);
seriesReader.nextIndex();
}

注意这里要把聚合结果集 aggregateResultList 按分量进行分组,保证每次使用 statistics 更新的都是属于该分量的聚合结果。

因此在上层,我们需要一个 Map<String, List<Integer>> subPathToAggrIndexMap, 表明每个分量在结果集中的位置。

例如 select count(s1), count(s2), max_value(s1), min_value(s2) from root.sg.d1.vector1

则会聚合为一条路径,root.sg.d1.vector1[s1, s2] 和一个 Map, (s1 →  [0, 2], s1 → [1], s2→ [2])