THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
IoTDB + SDT 的压缩+查询模式将参考 OSISoft Pi
PI 压缩
- Compression deviation
- Compression minimum
- Interval 之间的最小时间间隔距离
- 若 compMin = 0,记录每一个超过CD的点
- 可以设定compMin 防止noisy point 占用存储空间
- Compression maximum
- Interval 之间的最大时间间隔距离
- 不论CD 的值,若两点之间的时间距离 >= compMax,pi server会记录当前数据点并结束当前segment
- 为了更精准的压缩数据
PI 查询
- 不存压缩前的时间戳
- 压缩后的数据是存储在数据归档中的实际原始数据。它被称为压缩,因为它已经通过压缩,并且保留了有意义的信息的值。如果由于压缩过程中丢失了一个值而没有得到准确的时间,则应调整压缩设置。
- 统计数据为压缩后的stats 信息,与压缩的数据保持一致
- 分为两种查询模式
- 指定时间戳查询
- 若指定的时间戳,不存在数据点,根据时间戳和前后两个点的斜率,对数据进行插值
- Range query
- 返回的压缩后的数据集,不进行解压
SDT Encoder
- List<Long> timestamps
- List<T> values 支持
- List<Long> longValues
- List<Integer> intValues
- List<Double> doubleValues
- List<Float> floatValues
- encode(time, value, timeOut, valueOut) 对不同数据类型的values 需要提供不同的接口
- timestamps.add(time)
- values.add(value)
- flush(timeOut, valueOut)
- Sdt 对timestamps 和values 进行压缩(根据CD,compMax,compMin)
- 写入 byteCache
- 只针对内存中的数据进行压缩,压缩后,更新stats
SDT Decoder
- getAllSatisfiedPageData 调用 Decoder.readLong
- 返回page data的时候,Sdt 不需要进行解压
- Sdt decoder 直接返回timebuffer + valuebuffer, 在pageReader 中遍历返回的结果,pagedata.put(t, v)
- public boolean hasNext(ByteBuffer buffer)
- 调用方式
- pageReader, while sdtDecoder.hasNext(timeBuffer)
- 遍历timeBuffer,返回一个时间戳
- public long readLong(ByteBuffer buffer)
- 调用方式
- pageReader, while sdtDecoder.hasNext(timeBuffer)
- long aLong = sdtDecoder.readLong(valueBuffer);
- pageData.put(t, aLong)
PageWriter
需要提供的变量
- private Encoder sdtEncoder;
- private PublicBAOS timeOut, valueOut;
在当前的接口基础上,需要增加:
- public void write(long time, double value)
- 需要对4 种数据类型写接口
- 将time,value 加入sdtEncoder中
- 更新内存中的stats(未压缩的数据)
- 刷入磁盘之前进行内存中的数据的压缩,同时更新压缩后的stats
- public void write(long[] timestamps, long[] values, int batchSize)
- 需要对4 种数据类型写接口
- private void prepareEndWriteOnePage() throws IOException
- sdtEncoder.flush(timeOut, valueOut)
- public ByteBuffer getUncompressedBytes() throws IOException
- pageData = getUncompressedBytes()
- buffer.put(timeOut.getBuf(), 0, timeOut.size());
- buffer.put(valueOut.getBuf(), 0, valueOut.size());
创建时间序列 -> 写入数据 -> 刷入磁盘,具体流程:
- 创建时间序列的时候,设置encoding 方式
- createTimeseries() -> new MeasurementMNode -> new MeasurementSchema(measurementName, dataType, encoding, type, props)
- 当前timeEncoding 默认为Ts2Diff,写入tsfile的时候,time 和value 进行编码
- 插入数据
- Flush 的时候会调用 PageWriter.write(time, value)
- Sdt encoding 的时候需要数据点的 time 和 value 的
- 当前PageWriter.write(time, value) 分别对timeEncoder 和valueEncoder 进行编码,sdt 需要在此基础上,增加write(time, value) 的接口
- 刷入磁盘
PageReader
需要提供的变量
- private Decoder sdtDecoder;
- private ByteBuffer buffer;
查询将分为两种
- 不需要解压的查询
- 范围查询
Select s1 where time > 3 and time < 10
Group by - 需要解压进行插值计算的查询
- 点查询
Select s1 where time = 3- 需要注意 time = 3 的前后两点,是否跨page
- 调用 SDTDecoder
- S1 time = 3 的数据点经过压缩,并存在了磁盘,直接返回该点
- S1 time = 3 的数据点不存在,找到s1 time = 3 的前后两个数据点并进行插值计算
- 点查询
点查询
- Ie time = x,page1, page2, page3 ..
- 若x < page1.startime || x > page3.endtime
- return null
- 在该ts 第一个和最后一个时间戳之外,不进行插值计算
- 若x >= page2.startime && x <= page2.endtime
- 遍历pageReaders 的时候,记录 maxPrevPair, minNextPair
- 若存在time = 3 的数据点,直接返回
- 若最后遍历完getValue == null,则通过 maxPrevPair, minNextPair 进行插值计算
- 若x >= page2.endtime && x <= page3.starttime
- 遍历pageData 的时候,记录maxPrevPage, minNextPage
- 若生成pageReaders.size == 0,则通过 maxPrevPage, minNextPage 进行unpack page,decoder 返回插值计算的结果
范围查询
- 与当前逻辑一致,直接返回压缩后的结果集,不进行插值计算
聚合查询
- 与当前逻辑一致,stats 是在encoding 之前进行计算的
- 聚合查询调用stats,若查询发生在内存中(压缩前),stats 为压缩前的记录结果。flush 的时候进行压缩,并更新stats,保证磁盘上压缩后的结果,和stats 一致
一条数据的查询
- 与当前一致,返回压缩后的结果集
多条数据的查询
- 与当前一致,返回压缩后的结果集