Downsampling query
org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet
GroupByEngineDataSet
, this class contains the following fields:
protected long queryId
private long interval
private long slidingStep
The following two fields are for the entire query, and the time period is left closed and right open, which is [startTime, endTime)
:
private long startTime
private long endTime
The following fields are for the current segment, and the time period is left closed and right open, which is [startTime, endTime)
:
protected long curStartTime;
protected long curEndTime;
private int usedIndex;
protected boolean hasCachedTimeInterval;
The core method of GroupByEngineDataSet
is very easy. First, determine if there is a next segment based on whether there is a cached time period, and return true
; if not, calculate the segmentation start time and increase usedIndex
by 1. If the segment start time has exceeded the query end time, return false
; otherwise, calculate the query end time, set hasCachedTimeInterval
to true
, and returntrue
:
protected boolean hasNextWithoutConstraint() {
if (hasCachedTimeInterval) {
return true;
}
curStartTime = usedIndex * slidingStep + startTime;
usedIndex++;
if (curStartTime < endTime) {
hasCachedTimeInterval = true;
curEndTime = Math.min(curStartTime + interval, endTime);
return true;
} else {
return false;
}
}
Downsampling query without value filter
The downsampling query logic without value filter is mainly in the GroupByWithoutValueFilterDataSet
class, which inheritsGroupByEngineDataSet
.
This class has the following key fields:
private Map <Path, GroupByExecutor> pathExecutors classifies aggregate functions for the same
Path
and encapsulates them asGroupByExecutor
,GroupByExecutor
encapsulates the data calculation logic and method of each Path, which will be described laterprivate TimeRange timeRange encapsulates the time interval of each calculation into an object, which is used to determine whether Statistics can directly participate in the calculation
private Filter timeFilter Generates a user-defined query interval as a Filter object, which is used to filter the available files, chunks, and pages.
First, in the initialization initGroupBy()
method, the timeFilter
is calculated based on the expression, and GroupByExecutor
is generated for each path
.
First, in the initialization initGroupBy()
method, the timeFilter
is calculated based on the expression, and GroupByExecutor
is generated for each path
. The following method is used to convert the result list into a RowRecord. Note that when there are no results in the list, add null
to the RowRecord:
for (AggregateResult res : fields) {
if (res == null) {
record.addField(null);
continue;
}
record.addField(res.getResult(), res.getResultDataType());
}
GroupByExecutor
Encapsulating the calculation method of all aggregate functions under the same path, this class has the following key fields:
private IAggregateReader reader the
SeriesAggregateReader
used to read the currentPath
dataprivate BatchData preCachedData Every time the data read from
Reader
is a batch, and it is likely to exceed the current time period. ThisBatchData
will be cached for next useprivate List<Pair<AggregateResult, Integer>> results stores all aggregation methods in the current
Path
, for example:select count(a), sum(a), avg(b)
,count
andsum
can be stored together. TheInteger
on the right is used to convert the result set to the order of the user query before converting it to RowRecord.
Main method
//Read data from the reader and calculate the main method of this class.
private List<Pair<AggregateResult, Integer>> calcResult() throws IOException, QueryProcessException;
//Add aggregation operation for current path
private void addAggregateResult(AggregateResult aggrResult, int index);
//Determine whether the current path has completed all aggregation calculations
private boolean isEndCalc();
//Calculate results from BatchData that did not run out of cache last calculation
private boolean calcFromCacheData() throws IOException;
//Calculation using BatchData
private void calcFromBatch(BatchData batchData) throws IOException;
//Calculate results directly using Page or Chunk's Statistics
private void calcFromStatistics(Statistics statistics) throws QueryProcessException;
//Clear all calculation results
private void resetAggregateResults();
//Iterate through and calculate the data in the page
private boolean readAndCalcFromPage() throws IOException, QueryProcessException;
In GroupByExecutor
, because different aggregate functions of the same path use the same data, the entry method calcResult
is responsible for reading all the data of the Path
. The retrieved data then calls the calcFromBatch
method to complete the calculation of BatchData
through all the aggregate functions.
The calcResult
method returns all AggregateResult under the current Path and the position of the current aggregated value in the user query order. Its main logic is:
//Calculate the data left over from the last time, and end the calculation if you can get the results directly
if (calcFromCacheData()) {
return results;
}
//Because a chunk contains multiple pages, the page of the current chunk must be used up before the next chunk is opened.
if (readAndCalcFromPage()) {
return results;
}
//If the remaining data is calculated, open a new chunk to continue the calculation.
while (reader.hasNextChunk()) {
Statistics chunkStatistics = reader.currentChunkStatistics();
// Determine if Statistics is available and perform calculations
....
// Skip current chunk
reader.skipCurrentChunk();
// End calculation if all results have been obtained
if (isEndCalc()) {
return true;
}
continue;
}
//If you cannot use chunkStatistics, you need to use page data to calculate
if (readAndCalcFromPage()) {
return results;
}
}
The readAndCalcFromPage
method is to obtain the page data from the currently opened chunk and calculate the aggregate result. Returns true when all calculations are completed, otherwise returns false. The main logic:
while (reader.hasNextPage()) {
Statistics pageStatistics = reader.currentPageStatistics();
//PageStatistics can only be used if the page does not intersect with other pages
if (pageStatistics != null) {
// Determine if Statistics is available and perform calculations
....
// Skip current page
reader.skipCurrentPage();
// End calculation if all results have been obtained
if (isEndCalc()) {
return true;
}
continue;
}
}
// When Statistics is not available, you can only fetch all data for calculation
BatchData batchData = reader.nextPage();
if (batchData == null || !batchData.hasCurrent()) {
continue;
}
// If the page just opened exceeds the time range, the data retrieved is cached and the calculation is directly ended.
if (batchData.currentTime() >= curEndTime) {
preCachedData = batchData;
return true;
}
//Perform calculations
calcFromBatch(batchData);
...
}
The calcFromBatch
method is to traverse all the aggregate functions to calculate the retrieved BatchData. The main logic is:
for (Pair<AggregateResult, Integer> result : results) {
//If a function has already been calculated, it will not be calculated, such as the minimum calculation.
if (result.left.isCalculatedAggregationResult()) {
continue;
}
// Perform calculations
....
}
//Determine if the data in the current batchdata can still be used next time, if it can be added to the cache
if (batchData.getMaxTimestamp() >= curEndTime) {
preCachedData = batchData;
}
Aggregated query with value filter
The downsampling query logic with value filtering conditions is mainly in the GroupByWithValueFilterDataSet
class, which inherits GroupByEngineDataSet
.
This class has the following key fields:
private List<IReaderByTimestamp> allDataReaderList
private GroupByPlan groupByTimePlan
private TimeGenerator timestampGenerator
private long timestamp is used to cache timestamp for the next group by partition
private boolean hasCachedTimestamp used to determine whether there is a timestamp cache for the next group by partition
private int timeStampFetchSize is the size of the group by calculating the batch
First, in the initialization initGroupBy ()
method, create a timestampGenerator
based on the expression; then create a SeriesReaderByTimestamp
for each time series and place it in the allDataReaderList
list. After initialization is complete, call the nextWithoutConstraint ()
method to update the result. If timestamp is cached for the next group by partition and the time meets the requirements, add it to timestampArray
, otherwise return the aggregateResultList
result directly; if timestamp is not cached for the next group by partition, use timestampGenerator
to traverse:
while (timestampGenerator.hasNext()) {
// Call constructTimeArrayForOneCal () method to get a list of timestamp
timeArrayLength = constructTimeArrayForOneCal(timestampArray, timeArrayLength);
// Call the updateResultUsingTimestamps () method to calculate the aggregate result using the timestamp list
for (int i = 0; i < paths.size(); i++) {
aggregateResultList.get(i).updateResultUsingTimestamps(
timestampArray, timeArrayLength, allDataReaderList.get(i));
}
timeArrayLength = 0;
// Determine if it is over
if (timestamp >= curEndTime) {
hasCachedTimestamp = true;
break;
}
}
The constructTimeArrayForOneCal ()
method traverses timestampGenerator to build a list of timestamps:
for (int cnt = 1; cnt < timeStampFetchSize && timestampGenerator.hasNext(); cnt++) {
timestamp = timestampGenerator.next();
if (timestamp < curEndTime) {
timestampArray[timeArrayLength++] = timestamp;
} else {
hasCachedTimestamp = true;
break;
}
}
Aggregated query with level
After down-frequency query, we could also to count the total number of points of
each node at the given level in current Metadata Tree.
The logic is in the GroupByTimeDataSet
class.
In the beginning, get the final paths group by level and the origin path index to final path.
For example, we could get final path
root.sg1
byroot.sg1.d1.s0,root.sg1.d2.s1
andlevel=1
.Then, get the down-frequency query result: RowRecord.
Finally, merge each RowRecord to NewRecord, which has fields like <final path, count>.
For example, we will get new RowRecord
<root.sg1,7>
by<root.sg1.d1.s0, 3>, <root.sg1.d2.s1, 4>
and level=1.
Attention:
only support count aggregation