You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »


概述

旋转门算法是一种比较快速的线性拟合算法,常常用于实时数据库中对数据进行压缩,使存储容量大大的减少。

在实时数据库中,数据通常具有如下特点:1. 数据采集量大。2. 数据临近度高。如果不能对这些数据进行压缩,将对资源造成巨大的浪费。

旋转门算法作为线性拟合的一种简便算法,具有效率高、压缩比高、实现简单、误差可控制的优点,已成为一种专门算法。

IoTDB + SDT 的压缩+查询模式将参考 OSISoft Pi


参考资料:

算法图解

算法论文.


基本概念

  • Error Compression Deviation (CD) 压缩偏差

    • 一个数据点与线性趋势的最大差异值 E

    • 运行程序前,预先设定的E 绝对压缩偏差值

    • 在偏差范围内,数据被压缩并扔掉

    • 在偏差范围外,数据不被压缩,保存下来

  • Compression minimum (comp min)
    • segment 之间的最小时间间隔距离
    • 若 compMin = 0,记录每一个超过CD的点
    • 可以设定compMin 防止noisy point 占用存储空间
  • Compression maximum (comp max)
    • segment 之间的最大时间间隔距离
    • 不论CD 的值,若两点之间的时间距离 >= compMax,pi server会记录当前数据点并结束当前segment
    • 为了更精准的压缩数据

  • upperDoor,当前保留数据点值+CD为支点的门

    • 起始默认为关闭状态  Integer.min

    • 和curUpperSlope 进行比较,取最大值,只能逆时针打开

  • lowerDoor,当前保留数据点值-CD为支点的门

    • 起始默认为关闭状态 Integer.max 

    • 和curLowerSlope进行比较,取最小值,只能顺时针打开

  • curUpperSlope 

    • 当前数据点到segment starttime 的上斜率

  • curLowerSlope 

    • 当前数据点到segment starttime 的下斜率

  • 存储上一个数据点

    • 两扇门超过平行的时候

    • upperDoor >= lowerDoor 


上下斜率计算方式:

curUpperSlope = (curVal - lastStoredVal - CD) / (curTime - lastStoredTime)

curLowerSlope = (curVal - lastStoredVal + CD) / (curTime - lastStoredTime)

upperDoor = Math.max(curUpperSlope, upperDoor)

lowerDoor = Math.min(curLowerSlope, lowerDoor)


算法说明

绿色虚线: upperDoor, lowerDoor

灰色虚线:新节点加入时重新计算的upperDoor,lowerDoor

lowerDoor:当前segment 所有数据最小的下斜率

upperDoor:当前segment 所有数据最大的上斜率

红色实线:segment,实际上是存储 startpoint endpoint,后续解压缩时使用



如上图,数据加入及压缩流程:

  1. 新加入数据点p0,设置为lastStored,并保存下来

    1. lastStored = p0 当数据为第一个数据或者超过CD时,标记lastStored,并加入compressedList

  2. upperDoor, lowerDoor 起始默认为关闭的(绿色虚线)

    1. upperDoor 只能逆时针打开,lowerDoor 只能顺时针打开

  3. 新加入数据点p1,计算p1 的 upperSlope,lowerSlope

  4. 当前计算的upperSlope > upperDoor,lowerSlope < lowerDoor

    1. 两扇门分别打开,并更新其最大最小值

    2. 因为p1 在CD 范围内,不会被保存,标记为lastRead。lastStored 依旧为 p0

  5. 新加入数据点p2,重新计算upperSlope,lowerSlope。并更新upperDoor,lowerDoor

  6. 新加入数据点p3,重新计算upperSlope,lowerSlope。并更新upperDoor。不更新lowerDoor,因为 lowerSlope > lowerDoor,lowerDoor 只能顺时针打开

  7. 新加入数据点p4,重新计算upperSlope,lowerSlope。并更新lowerDoor,不更新upperDoor

  8. 每次计算完上下斜率,进行upperDoor lowerDoor 检查是否超过平行

  9. 加入p4 的时候。发现upperDoor >= lowerDoor,两扇门超过平行水平,数据范围已经超过了CD,需要保存上一个数据点lastRead p3

    1. 标记 lastStored = p3,compressedList 加入p3

    2. p3 作为segment0 endpoint

    3. p3 作为segment1 startpoint

    4. p4 和p3 进行比较,重新计算upperDoor,lowerDoor(绿色虚线)


压缩算法

  1. 第一个数据点需要保存

  2. 第二个及以后的数据点,需要计算当前到segment startpoint 的上下斜率,并和upperDoor,lowerDoor 进行比较

  3. upperDoor 只能逆时针打开,lowerDoor 只能顺时针打开

  4. 若两扇门处于或超过平行状态,并且当前点和上一存储点的时间间隔 >= comp min,存储上一个阅读的点,并开启新的segment

  5. 新的segment upperDoor,lowerDoor 计算通过当前数据和上一个segment endpoint 进行比较

  6. 若当前点和上一存储点的时间间隔 >= comp max,存储当前点,开启新的segment
  7. 若全部rawData 读取完毕,compressedList 只有第一个数据(所有数据偏差不超过CD)则加入最后一个读取的数据点


性能测试


同等压缩误差值CD和数据量,数据波动越小,压缩比越高


  • 源数据为规律的波动pi 值,数据量 = 10000,振幅 = 100

  • Compression deviation = 0.5

  • 存储数据:556

  • 压缩比:94.44%




OSISoft PI 查询方式

  • 不存压缩前的时间戳
    • 压缩后的数据是存储在数据归档中的实际原始数据。它被称为压缩,因为它已经通过压缩,并且保留了有意义的信息的值。如果由于压缩过程中丢失了一个值而没有得到准确的时间,则应调整压缩设置。
    • 统计数据为压缩后的stats 信息,与压缩的数据保持一致
  • 分为两种查询模式
    1. 若指定的时间戳,不存在数据点,根据时间戳和前后两个点的斜率,对数据进行插值
    1. 返回的压缩后的数据集,不进行解压
    1. 指定时间戳查询
    2. Range query

https://www.youtube.com/watch?v=ZMNIcKH5mgc&vl=en

https://www.youtube.com/watch?v=tr5KR499QyQ

Pi Compressed Data function



以下为,IoTDB + SDT 设计

SDT Encoder

  • List<Long> timestamps
  • List<T> values 支持
    • List<Long> longValues
    • List<Integer> intValues
    • List<Double> doubleValues
    • List<Float> floatValues

  • encode(time, value, timeOut, valueOut) 对不同数据类型的values 需要提供不同的接口
    • timestamps.add(time)
    • values.add(value)

  • flush(timeOut, valueOut)
    • Sdt 对timestamps 和values 进行压缩(根据CD,compMax,compMin)
    • 写入 byteCache
    • 只针对内存中的数据进行压缩,压缩后,更新stats

SDT Decoder

  • getAllSatisfiedPageData 调用 Decoder.readLong
  • 返回page data的时候,Sdt 不需要进行解压
    • Sdt decoder 直接返回timebuffer + valuebuffer, 在pageReader 中遍历返回的结果,pagedata.put(t, v)


  • public boolean hasNext(ByteBuffer buffer)
    • pageReader, while sdtDecoder.hasNext(timeBuffer)
    • 遍历timeBuffer,返回一个时间戳
    • 调用方式
  • public long readLong(ByteBuffer buffer)
    • pageReader, while sdtDecoder.hasNext(timeBuffer)
      • long aLong = sdtDecoder.readLong(valueBuffer);
      • pageData.put(t, aLong)
    • 调用方式

PageWriter

需要提供的变量

  • private Encoder sdtEncoder;
  • private PublicBAOS timeOut, valueOut;


在当前的接口基础上,需要增加:

  • public void write(long time, double value)
    • 刷入磁盘之前进行内存中的数据的压缩,同时更新压缩后的stats
    • 需要对4 种数据类型写接口 
    • 将time,value 加入sdtEncoder中
    • 更新内存中的stats(未压缩的数据)
  • public void write(long[] timestamps, long[] values, int batchSize) 
    • 需要对4 种数据类型写接口 
  • private void prepareEndWriteOnePage() throws IOException 
    • sdtEncoder.flush(timeOut, valueOut)
  • public ByteBuffer getUncompressedBytes() throws IOException 
    • buffer.put(timeOut.getBuf(), 0, timeOut.size());
    • buffer.put(valueOut.getBuf(), 0, valueOut.size());
    • pageData = getUncompressedBytes()


创建时间序列 -> 写入数据 -> 刷入磁盘,具体流程:

  • 创建时间序列的时候,设置encoding 方式
    • createTimeseries() -> new MeasurementMNode -> new MeasurementSchema(measurementName, dataType, encoding, type, props)
    • 当前timeEncoding 默认为Ts2Diff,写入tsfile的时候,time 和value 进行编码


  • 插入数据
    • 当前PageWriter.write(time, value) 分别对timeEncoder 和valueEncoder 进行编码,sdt 需要在此基础上,增加write(time, value) 的接口
    • Flush 的时候会调用 PageWriter.write(time, value)
    • Sdt encoding 的时候需要数据点的 time 和 value 的


                         

  • 刷入磁盘

                        

PageReader 

需要提供的变量

  • private Decoder sdtDecoder;
  • private ByteBuffer buffer;


查询将分为两种

  1. 不需要解压的查询
    1. 范围查询
      Select s1 where time > 3 and time < 10
      Group by
  2. 需要解压进行插值计算的查询
    1. 点查询
      Select s1 where time = 3
      1. 需要注意 time = 3 的前后两点,是否跨page
      2. 调用 SDTDecoder 
        1. S1 time = 3 的数据点经过压缩,并存在了磁盘,直接返回该点
        2. S1 time = 3 的数据点不存在,找到s1 time = 3 的前后两个数据点并进行插值计算

点查询

  1. Ie time = x,page1, page2, page3 ..
    1. return null
    2. 在该ts 第一个和最后一个时间戳之外,不进行插值计算
    1. 遍历pageReaders 的时候,记录 maxPrevPair, minNextPair 
    2. 若存在time = 3 的数据点,直接返回
    3. 若最后遍历完getValue == null,则通过 maxPrevPair, minNextPair 进行插值计算
    1. 遍历pageData 的时候,记录maxPrevPage, minNextPage
    2. 若生成pageReaders.size == 0,则通过 maxPrevPage, minNextPage 进行unpack page,decoder 返回插值计算的结果
    1. 若x < page1.startime || x > page3.endtime
    2. 若x >= page2.startime && x <= page2.endtime
    3. 若x >= page2.endtime && x <= page3.starttime



范围查询

  • 与当前逻辑一致,直接返回压缩后的结果集,不进行插值计算

聚合查询

  • 与当前逻辑一致,stats 是在encoding 之前进行计算的
  • 聚合查询调用stats,若查询发生在内存中(压缩前),stats 为压缩前的记录结果。flush 的时候进行压缩,并更新stats,保证磁盘上压缩后的结果,和stats 一致

一条数据的查询

  • 与当前一致,返回压缩后的结果集

多条数据的查询

  • 与当前一致,返回压缩后的结果集










  • No labels