概述
旋转门算法是一种比较快速的线性拟合算法,常常用于实时数据库中对数据进行压缩,使存储容量大大的减少。
在实时数据库中,数据通常具有如下特点:1. 数据采集量大。2. 数据临近度高。如果不能对这些数据进行压缩,将对资源造成巨大的浪费。
旋转门算法作为线性拟合的一种简便算法,具有效率高、压缩比高、实现简单、误差可控制的优点,已成为一种专门算法。
IoTDB + SDT 的压缩+查询模式将参考 OSISoft Pi
参考资料:
算法论文.
基本概念
Error Compression Deviation (CD) 压缩偏差
一个数据点与线性趋势的最大差异值 E
运行程序前,预先设定的E 绝对压缩偏差值
在偏差范围内,数据被压缩并扔掉
在偏差范围外,数据不被压缩,保存下来
- Compression minimum (comp min)
- segment 之间的最小时间间隔距离
- 若 compMin = 0,记录每一个超过CD的点
- 可以设定compMin 防止noisy point 占用存储空间
- Compression maximum (comp max)
- segment 之间的最大时间间隔距离
- 不论CD 的值,若两点之间的时间距离 >= compMax,pi server会记录当前数据点并结束当前segment
- 为了更精准的压缩数据
upperDoor,当前保留数据点值+CD为支点的门
起始默认为关闭状态 Integer.min
和curUpperSlope 进行比较,取最大值,只能逆时针打开
lowerDoor,当前保留数据点值-CD为支点的门
起始默认为关闭状态 Integer.max
和curLowerSlope进行比较,取最小值,只能顺时针打开
curUpperSlope
当前数据点到segment starttime 的上斜率
curLowerSlope
当前数据点到segment starttime 的下斜率
存储上一个数据点
两扇门超过平行的时候
upperDoor >= lowerDoor
上下斜率计算方式:
curUpperSlope = (curVal - lastStoredVal - CD) / (curTime - lastStoredTime)
curLowerSlope = (curVal - lastStoredVal + CD) / (curTime - lastStoredTime)
upperDoor = Math.max(curUpperSlope, upperDoor)
lowerDoor = Math.min(curLowerSlope, lowerDoor)
算法说明
绿色虚线: upperDoor, lowerDoor
灰色虚线:新节点加入时重新计算的upperDoor,lowerDoor
lowerDoor:当前segment 所有数据最小的下斜率
upperDoor:当前segment 所有数据最大的上斜率
红色实线:segment,实际上是存储 startpoint endpoint,后续解压缩时使用
如上图,数据加入及压缩流程:
新加入数据点p0,设置为lastStored,并保存下来
lastStored = p0 当数据为第一个数据或者超过CD时,标记lastStored,并加入compressedList
upperDoor, lowerDoor 起始默认为关闭的(绿色虚线)
upperDoor 只能逆时针打开,lowerDoor 只能顺时针打开
新加入数据点p1,计算p1 的 upperSlope,lowerSlope
当前计算的upperSlope > upperDoor,lowerSlope < lowerDoor
两扇门分别打开,并更新其最大最小值
因为p1 在CD 范围内,不会被保存,标记为lastRead。lastStored 依旧为 p0
新加入数据点p2,重新计算upperSlope,lowerSlope。并更新upperDoor,lowerDoor
新加入数据点p3,重新计算upperSlope,lowerSlope。并更新upperDoor。不更新lowerDoor,因为 lowerSlope > lowerDoor,lowerDoor 只能顺时针打开
新加入数据点p4,重新计算upperSlope,lowerSlope。并更新lowerDoor,不更新upperDoor
每次计算完上下斜率,进行upperDoor lowerDoor 检查是否超过平行
加入p4 的时候。发现upperDoor >= lowerDoor,两扇门超过平行水平,数据范围已经超过了CD,需要保存上一个数据点lastRead p3
标记 lastStored = p3,compressedList 加入p3
p3 作为segment0 endpoint
p3 作为segment1 startpoint
p4 和p3 进行比较,重新计算upperDoor,lowerDoor(绿色虚线)
压缩算法
第一个数据点需要保存
第二个及以后的数据点,需要计算当前到segment startpoint 的上下斜率,并和upperDoor,lowerDoor 进行比较
upperDoor 只能逆时针打开,lowerDoor 只能顺时针打开
若两扇门处于或超过平行状态,并且当前点和上一存储点的时间间隔 >= comp min,存储上一个阅读的点,并开启新的segment
新的segment upperDoor,lowerDoor 计算通过当前数据和上一个segment endpoint 进行比较
- 若当前点和上一存储点的时间间隔 >= comp max,存储当前点,开启新的segment
若全部rawData 读取完毕,compressedList 只有第一个数据(所有数据偏差不超过CD)则加入最后一个读取的数据点
性能测试
同等压缩误差值CD和数据量,数据波动越小,压缩比越高
源数据为规律的波动pi 值,数据量 = 10000,振幅 = 100
Compression deviation = 0.5
存储数据:556
压缩比:94.44%
OSISoft PI 查询方式
- 不存压缩前的时间戳
- 压缩后的数据是存储在数据归档中的实际原始数据。它被称为压缩,因为它已经通过压缩,并且保留了有意义的信息的值。如果由于压缩过程中丢失了一个值而没有得到准确的时间,则应调整压缩设置。
- 统计数据为压缩后的stats 信息,与压缩的数据保持一致
- 分为两种查询模式
- 若指定的时间戳,不存在数据点,根据时间戳和前后两个点的斜率,对数据进行插值
- 返回的压缩后的数据集,不进行解压
- 指定时间戳查询
- Range query
https://www.youtube.com/watch?v=ZMNIcKH5mgc&vl=en
https://www.youtube.com/watch?v=tr5KR499QyQ
以下为,IoTDB + SDT 设计
SDT Encoder
- List<Long> timestamps
- List<T> values 支持
- List<Long> longValues
- List<Integer> intValues
- List<Double> doubleValues
- List<Float> floatValues
- encode(time, value, timeOut, valueOut) 对不同数据类型的values 需要提供不同的接口
- timestamps.add(time)
- values.add(value)
- flush(timeOut, valueOut)
- Sdt 对timestamps 和values 进行压缩(根据CD,compMax,compMin)
- 写入 byteCache
- 只针对内存中的数据进行压缩,压缩后,更新stats
SDT Decoder
- getAllSatisfiedPageData 调用 Decoder.readLong
- 返回page data的时候,Sdt 不需要进行解压
- Sdt decoder 直接返回timebuffer + valuebuffer, 在pageReader 中遍历返回的结果,pagedata.put(t, v)
- public boolean hasNext(ByteBuffer buffer)
- pageReader, while sdtDecoder.hasNext(timeBuffer)
- 遍历timeBuffer,返回一个时间戳
- 调用方式
- public long readLong(ByteBuffer buffer)
- pageReader, while sdtDecoder.hasNext(timeBuffer)
- long aLong = sdtDecoder.readLong(valueBuffer);
- pageData.put(t, aLong)
- 调用方式
PageWriter
需要提供的变量
- private Encoder sdtEncoder;
- private PublicBAOS timeOut, valueOut;
在当前的接口基础上,需要增加:
- public void write(long time, double value)
- 刷入磁盘之前进行内存中的数据的压缩,同时更新压缩后的stats
- 需要对4 种数据类型写接口
- 将time,value 加入sdtEncoder中
- 更新内存中的stats(未压缩的数据)
- public void write(long[] timestamps, long[] values, int batchSize)
- 需要对4 种数据类型写接口
- private void prepareEndWriteOnePage() throws IOException
- sdtEncoder.flush(timeOut, valueOut)
- public ByteBuffer getUncompressedBytes() throws IOException
- buffer.put(timeOut.getBuf(), 0, timeOut.size());
- buffer.put(valueOut.getBuf(), 0, valueOut.size());
- pageData = getUncompressedBytes()
创建时间序列 -> 写入数据 -> 刷入磁盘,具体流程:
- 创建时间序列的时候,设置encoding 方式
- createTimeseries() -> new MeasurementMNode -> new MeasurementSchema(measurementName, dataType, encoding, type, props)
- 当前timeEncoding 默认为Ts2Diff,写入tsfile的时候,time 和value 进行编码
- 插入数据
- 当前PageWriter.write(time, value) 分别对timeEncoder 和valueEncoder 进行编码,sdt 需要在此基础上,增加write(time, value) 的接口
- Flush 的时候会调用 PageWriter.write(time, value)
- Sdt encoding 的时候需要数据点的 time 和 value 的
- 刷入磁盘
PageReader
需要提供的变量
- private Decoder sdtDecoder;
- private ByteBuffer buffer;
查询将分为两种
- 不需要解压的查询
- 范围查询
Select s1 where time > 3 and time < 10
Group by - 需要解压进行插值计算的查询
- 点查询
Select s1 where time = 3- 需要注意 time = 3 的前后两点,是否跨page
- 调用 SDTDecoder
- S1 time = 3 的数据点经过压缩,并存在了磁盘,直接返回该点
- S1 time = 3 的数据点不存在,找到s1 time = 3 的前后两个数据点并进行插值计算
- 点查询
点查询
- Ie select s1 from root.sg.d where time = x
- 若x < page1.startime || x > page3.endtime
- 在该ts 第一个和最后一个时间戳之外,不进行插值计算
- 若x >= page2.startime && x <= page2.endtime
- 遍历pageReaders 的时候,记录 maxPrevPair, minNextPair
- 若存在time = 3 的数据点,直接返回
- 若最后遍历完getValue == null,则通过 maxPrevPair, minNextPair 进行插值计算
- 若x >= page2.endtime && x <= page3.starttime
- 遍历pageData 的时候,记录maxPrevPage, minNextPage
- 若生成pageReaders.size == 0,则通过 maxPrevPage, minNextPage 进行unpack page,decoder 返回插值计算的结果
范围查询
- 与当前逻辑一致,直接返回压缩后的结果集,不进行插值计算
聚合查询
- 与当前逻辑一致,stats 是在encoding 之前进行计算的
- 聚合查询调用stats,若查询发生在内存中(压缩前),stats 为压缩前的记录结果。flush 的时候进行压缩,并更新stats,保证磁盘上压缩后的结果,和stats 一致
一条数据的查询
- 与当前一致,返回压缩后的结果集
多条数据的查询
- 与当前一致,返回压缩后的结果集