You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »


从底层来看,没有值过滤条件的聚合为每一条时间序列创建一个 SeriesAggregateReader,然后根据该 Reader 判断是否利用统计信息更新,包括 timeSeriesMetaData Statistics, ChunkMetaData statistics, 和 Page Statistics,涉及接口分别为

timeSeriesMetaData Statistics: `boolean canUseCurrentFileStatistics() throws IOException;`, `Statistics currentFileStatistics() throws IOException;` 以及

ChunkMetaData statistics: `canUseCurrentChunkStatistics()` 和 `Statistics currentChunkStatistics()`,

Page statistics: `canUseCurrentPageStatistics()` 和 `currentPageStatistics()`,

更新接口为: `void updateResultFromStatistics(Statistics statistics)`

或者利用数据值更新,即利用 batchData 进行更新,涉及接口分别为 `void updateResultFromPageData(BatchData dataInThisPage)`


由于一条多元时间序列的每个 MetaData 内都包含了多个统计信息,且得到的 BatchData 也包含一个时间列和多个值列,因此我们需要指定需要哪一个分量的统计信息,以及需要 BatchData 里的第几列作为值列。


这里 SeriesAggregateReader 实现了 IAggregateReader 接口,接口方法抽象的不错,因此可以直接实现一个新的类 VectorSeriesAggregateReader,实现其接口方法


public interface IAggregateReader {

boolean canUseCurrentFileStatistics() throws IOException;

Statistics currentFileStatistics() throws IOException;

boolean canUseCurrentChunkStatistics() throws IOException;

Statistics currentChunkStatistics() throws IOException;

/** only be used without value filter */
boolean canUseCurrentPageStatistics() throws IOException;

/** only be used without value filter */
Statistics currentPageStatistics() throws IOException;

...

}


以使用 TimeSeriesMetaData 为例:


由于一个 Vector存在多个分量的 TimeSeriesMetaData, 因此在使用 currentFileStatistics() 接口时,需要传入一个参数来表明需要哪一个分量的 TimeSeriesMetaData statistics。


我们在 VectorSeriesAggregateReader 类中 定义一个游标 curIndex,来表示当前遍历的分量位置, 则 currentFileStatistics() 方法可以重写为:


@Override
public Statistics currentFileStatistics() throws IOException {
return seriesReader.currentFileStatistics(curIndex);
}


之后拿到第 curIndex 个分量的统计信息返回给上层。


public Statistics getStatistics(int index) {
return valueTimeseriesMetadataList.get(index).getStatistics();
}


并在外层控制游标:

while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) {
Statistics statistics = getStatistics(seriesReader.getCurIndex());

updateUsingStatistics(statistics);
seriesReader.nextIndex();
}

注意这里要把聚合结果集 aggregateResultList 按分量进行分组,保证每次使用 statistics 更新的都是属于该分量的聚合结果。

因此在上层,我们需要一个 Map<String, List<Integer>> subPathToAggrIndexMap, 表明每个分量在结果集中的位置。

例如 select count(s1), count(s2), max_value(s1), min_value(s2) from root.sg.d1.vector1

则会聚合为一条路径,root.sg.d1.vector1[s1, s2] 和一个 Map, (s1 →  [0, 2], s1 → [1], s2→ [2])



对于利用数据值更新,为了最大程度的利用现有接口,使改动最小化,选择保持 void updateResultFromPageData(BatchData dataInThisPage) 不变,而将一个 VectorBatchData 拆解为多个分量的 subBatchData, 然后利用现有接口进行后续操作。

BatchData nextOverlappedPageData = seriesReader.nextPage();
BatchData[] subBatchData = nextOverlappedPageData.generateSubBatchData();
while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) {
updateUsingSubBatchData(SubAggrResultList, subBatchData)
seriesReader.nextIndex();
}






  • No labels