Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


概述

旋转门算法是一种比较快速的线性拟合算法,常常用于实时数据库中对数据进行压缩,使存储容量大大的减少。

在实时数据库中,数据通常具有如下特点:1. 数据采集量大。2. 数据临近度高。如果不能对这些数据进行压缩,将对资源造成巨大的浪费。

旋转门算法作为线性拟合的一种简便算法,具有效率高、压缩比高、实现简单、误差可控制的优点,已成为一种专门算法。

IoTDB + SDT 的压缩+查询模式将参考 OSISoft Pi

...

PI 压缩

...


时间复杂度:O(1)

空间复杂度:O(1)



参考资料:

算法图解

算法论文


基本概念

  • Error Compression Deviation (CD) 压缩偏差

    • 一个数据点与线性趋势的最大差异值 E

    • 运行程序前,预先设定的E 绝对压缩偏差值

    • 在偏差范围内,数据被压缩并扔掉

    • 在偏差范围外,数据不被压缩,保存下来

  • Compression minimum (comp min)
    • segment (存储的两个点为segment 的起点和终点)

...

    • 之间的最小时间间隔距离
    • 若 compMin = 0,记录每一个超过CD的点
    • 可以设定compMin 防止noisy point 占用存储空间
  • Compression maximum 

...

  • (comp max)
    • segment 之间的最大时间间隔距离
    • 不论CD 的值,若两点之间的时间距离 >= compMax,pi server会记录当前数据点并结束当前segment
    • 为了更精准的压缩数据

PI 查询

  • 不存压缩前的时间戳
    • 压缩后的数据是存储在数据归档中的实际原始数据。它被称为压缩,因为它已经通过压缩,并且保留了有意义的信息的值。如果由于压缩过程中丢失了一个值而没有得到准确的时间,则应调整压缩设置。
    • 统计数据为压缩后的stats 信息,与压缩的数据保持一致
  • 分为两种查询模式
    1. 指定时间戳查询
      1. 若指定的时间戳,不存在数据点,根据时间戳和前后两个点的斜率,对数据进行插值
    2. Range query
      1. 返回的压缩后的数据集,不进行解压

SDT Encoder

...

  • Sdt 对timestamps 和values 进行压缩(根据CD,compMax,compMin)
  • 写入 byteCache
  • 只针对内存中的数据进行压缩,压缩后,更新stats

SDT Decoder

  • getAllSatisfiedPageData 调用 Decoder.readLong
  • 返回page data的时候,Sdt 不需要进行解压
    • Sdt decoder 直接返回timebuffer + valuebuffer, 在pageReader 中遍历返回的结果,pagedata.put(t, v)

  • upperDoor,当前保留数据点值+CD为支点的门

    • 起始默认为关闭状态  Integer.min

    • 和curUpperSlope 进行比较,取最大值,只能逆时针打开

  • lowerDoor,当前保留数据点值-CD为支点的门

    • 起始默认为关闭状态 Integer.max 

    • 和curLowerSlope进行比较,取最小值,只能顺时针打开

  • curUpperSlope 

    • 当前数据点到segment starttime 的上斜率

  • curLowerSlope 

    • 当前数据点到segment starttime 的下斜率

  • 存储上一个数据点

    • 两扇门超过平行的时候

    • upperDoor >= lowerDoor 


上下斜率计算方式:

curUpperSlope = (curVal - lastStoredVal - CD) / (curTime - lastStoredTime)

curLowerSlope = (curVal - lastStoredVal + CD) / (curTime - lastStoredTime)

upperDoor = Math.max(curUpperSlope, upperDoor)

lowerDoor = Math.min(curLowerSlope, lowerDoor)


算法说明

Image Added

绿色虚线: upperDoor, lowerDoor

灰色虚线:新节点加入时重新计算的upperDoor,lowerDoor

lowerDoor:当前segment 所有数据最小的下斜率

upperDoor:当前segment 所有数据最大的上斜率

红色实线:segment,实际上是存储 startpoint endpoint,后续解压缩时使用



如上图,数据加入及压缩流程:

  1. 新加入数据点p0,设置为lastStored,并保存下来

    1. lastStored = p0 当数据为第一个数据或者超过CD时,标记lastStored,并加入compressedList

  2. upperDoor, lowerDoor 起始默认为关闭的(绿色虚线)

    1. upperDoor 只能逆时针打开,lowerDoor 只能顺时针打开

  3. 新加入数据点p1,计算p1 的 upperSlope,lowerSlope

  4. 当前计算的upperSlope > upperDoor,lowerSlope < lowerDoor

    1. 两扇门分别打开,并更新其最大最小值

    2. 因为p1 在CD 范围内,不会被保存,标记为lastRead。lastStored 依旧为 p0

  5. 新加入数据点p2,重新计算upperSlope,lowerSlope。并更新upperDoor,lowerDoor

  6. 新加入数据点p3,重新计算upperSlope,lowerSlope。并更新upperDoor。不更新lowerDoor,因为 lowerSlope > lowerDoor,lowerDoor 只能顺时针打开

  7. 新加入数据点p4,重新计算upperSlope,lowerSlope。并更新lowerDoor,不更新upperDoor

  8. 每次计算完上下斜率,进行upperDoor lowerDoor 检查是否超过平行

  9. 加入p4 的时候。发现upperDoor >= lowerDoor,两扇门超过平行水平,数据范围已经超过了CD,需要保存上一个数据点lastRead p3

    1. 标记 lastStored = p3,compressedList 加入p3

    2. p3 作为segment0 endpoint

    3. p3 作为segment1 startpoint

    4. p4 和p3 进行比较,重新计算upperDoor,lowerDoor(绿色虚线)


压缩算法

  1. 第一个数据点需要保存

  2. 第二个及以后的数据点,通过比较last read point

    1. 计算当前到segment startpoint 的上斜率,并和upperDoor 进行比较

    2. 计算当前到segment startpoint 的下斜率,并和lowerDoor进行比较
  3. upperDoor 只能逆时针打开,lowerDoor 只能顺时针打开

  4. 若两扇门处于或超过平行状态,并且当前点和上一存储点的时间间隔 >= comp min,存储上一个阅读的点,并开启新的segment

  5. 新的segment upperDoor,lowerDoor 计算通过当前数据和上一个segment endpoint 进行比较

  6. 若当前点和上一存储点的时间间隔 >= comp max,存储当前点,开启新的segment
  7. 若当前点和上一存储点的时间间隔 <= comp min,不存储当前点,防止noisy point
  8. 若全部rawData 读取完毕,compressedList 只有第一个数据(所有数据偏差不超过CD)则加入最后一个读取的数据点


性能测试


同等压缩误差值CD和数据量,数据波动越小,压缩比越高


  • 源数据为规律的波动pi 值,数据量 = 10000,振幅 = 100

  • Compression deviation = 0.5

  • 存储数据:556

  • 压缩比:94.44%


Image Added



OSISoft PI 查询方式

  • range query
    • 返回的是压缩后的数据点
    • 压缩后的数据是存储在数据归档中的实际原始数据。它被称为压缩,因为它已经通过压缩,并且保留了有意义的信息的值。如果由于压缩过程中丢失了一个值而没有得到准确的时间,则应调整压缩设置。
    • 统计数据为压缩后的统计信息,statistics与压缩后的数据保持一致
  • fill 插值查询
    • 指定时间戳,若不存在,根据前后两点的斜率,进行插值计算
    • 在iotdb 中,用户需要使用fill 进行插值计算

https://www.youtube.com/watch?v=ZMNIcKH5mgc&vl=en

https://www.youtube.com/watch?v=tr5KR499QyQ

Pi Compressed Data function




SDT Encoder

以下为,IoTDB + SDT 设计

  • SDT 需要在当前encoder的上一层,对数据进行压缩,pageWriter 中会调用默认的编码方式,对压缩后的数据进行编码
  • SDT 编码,将支持四种数据类型:float, double, int, long
  • 查询解码时,因为返回的是经过sdt 过滤后的数据信息,不需要进行解码,与当前查询的方式一致,不进行更改
  • stats 统计信息,在pageWriter.write() 压缩后进行更新,chunk中的stats 信息是集合了page stats,所以在内存中丢掉数据,stats不会有不一致现象

Image Added

TSEncoding

  • 在现有的编码类型基础上,将会增加五种编码类型
    • 若用户使用sdt 编码,根据数据类型设置encoding enum,  gorilla_sdt, plain_sdt, .... 
    • 若用户不使用sdt 编码,根据数据类型设置encoding enum,  gorilla, plain, ...

Image Added

ChunkWriterImpl

以下write 接口,都会进行 是否为sdtEncoding 的判断

public void write(long time, int value)

  • public boolean hasNext(ByteBuffer buffer)
    • 调用方式
      • pageReader, while sdtDecoder.hasNext(timeBuffer)
      • 遍历timeBuffer,返回一个时间戳
  • public long readLong(ByteBuffer buffer)
    • 调用方式
      • pageReader, while sdtDecoder.hasNext(timeBuffer)
        • long aLong = sdtDecoder.readLong(valueBuffer);
        • pageData.put(t, aLong)

PageWriter

需要提供的变量

  • private Encoder sdtEncoder;
  • private PublicBAOS timeOut, valueOut;

...

public void write(long time,

...

float value)

...

  • 刷入磁盘之前进行内存中的数据的压缩,同时更新压缩后的stats

public void write(long

...

time, long

...

  • 需要对4 种数据类型写接口 

...

  • sdtEncoder.flush(timeOut, valueOut)

...

  • pageData = getUncompressedBytes()
    • buffer.put(timeOut.getBuf(), 0, timeOut.size());
    • buffer.put(valueOut.getBuf(), 0, valueOut.size());

创建时间序列 -> 写入数据 -> 刷入磁盘,具体流程:

  • 创建时间序列的时候,设置encoding 方式
    • createTimeseries() -> new MeasurementMNode -> new MeasurementSchema(measurementName, dataType, encoding, type, props)
    • 当前timeEncoding 默认为Ts2Diff,写入tsfile的时候,time 和value 进行编码
  • 插入数据
    • Flush 的时候会调用 PageWriter.write(time, value)
    • Sdt encoding 的时候需要数据点的 time 和 value 的
      • 当前PageWriter.write(time, value) 分别对timeEncoder 和valueEncoder 进行编码,sdt 需要在此基础上,增加write(time, value) 的接口

...

value)

public void write(long time, double value)


Image Added


Table of Contents


概述

旋转门算法是一种比较快速的线性拟合算法,常常用于实时数据库中对数据进行压缩,使存储容量大大的减少。

在实时数据库中,数据通常具有如下特点:1. 数据采集量大。2. 数据临近度高。如果不能对这些数据进行压缩,将对资源造成巨大的浪费。

旋转门算法作为线性拟合的一种简便算法,具有效率高、压缩比高、实现简单、误差可控制的优点,已成为一种专门算法。

IoTDB + SDT 的压缩+查询模式将参考 OSISoft Pi


时间复杂度:O(1)

空间复杂度:O(1)



参考资料:

算法图解

算法论文


基本概念

  • Error Compression Deviation (CD) 压缩偏差

    • 一个数据点与线性趋势的最大差异值 E

    • 运行程序前,预先设定的E 绝对压缩偏差值

    • 在偏差范围内,数据被压缩并扔掉

    • 在偏差范围外,数据不被压缩,保存下来

  • Compression minimum (comp min)
    • segment (存储的两个点为segment 的起点和终点) 之间的最小时间间隔距离
    • 若 compMin = 0,记录每一个超过CD的点
    • 可以设定compMin 防止noisy point 占用存储空间
  • Compression maximum (comp max)
    • segment 之间的最大时间间隔距离
    • 不论CD 的值,若两点之间的时间距离 >= compMax,pi server会记录当前数据点并结束当前segment
    • 为了更精准的压缩数据

  • upperDoor,当前保留数据点值+CD为支点的门

    • 起始默认为关闭状态  Integer.min

    • 和curUpperSlope 进行比较,取最大值,只能逆时针打开

  • lowerDoor,当前保留数据点值-CD为支点的门

    • 起始默认为关闭状态 Integer.max 

    • 和curLowerSlope进行比较,取最小值,只能顺时针打开

  • curUpperSlope 

    • 当前数据点到segment starttime 的上斜率

  • curLowerSlope 

    • 当前数据点到segment starttime 的下斜率

  • 存储上一个数据点

    • 两扇门超过平行的时候

    • upperDoor >= lowerDoor 


上下斜率计算方式:

curUpperSlope = (curVal - lastStoredVal - CD) / (curTime - lastStoredTime)

curLowerSlope = (curVal - lastStoredVal + CD) / (curTime - lastStoredTime)

upperDoor = Math.max(curUpperSlope, upperDoor)

lowerDoor = Math.min(curLowerSlope, lowerDoor)


算法说明

Image Added

绿色虚线: upperDoor, lowerDoor

灰色虚线:新节点加入时重新计算的upperDoor,lowerDoor

lowerDoor:当前segment 所有数据最小的下斜率

upperDoor:当前segment 所有数据最大的上斜率

红色实线:segment,实际上是存储 startpoint endpoint,后续解压缩时使用



如上图,数据加入及压缩流程:

  1. 新加入数据点p0,设置为lastStored,并保存下来

    1. lastStored = p0 当数据为第一个数据或者超过CD时,标记lastStored,并加入compressedList

  2. upperDoor, lowerDoor 起始默认为关闭的(绿色虚线)

    1. upperDoor 只能逆时针打开,lowerDoor 只能顺时针打开

  3. 新加入数据点p1,计算p1 的 upperSlope,lowerSlope

  4. 当前计算的upperSlope > upperDoor,lowerSlope < lowerDoor

    1. 两扇门分别打开,并更新其最大最小值

    2. 因为p1 在CD 范围内,不会被保存,标记为lastRead。lastStored 依旧为 p0

  5. 新加入数据点p2,重新计算upperSlope,lowerSlope。并更新upperDoor,lowerDoor

  6. 新加入数据点p3,重新计算upperSlope,lowerSlope。并更新upperDoor。不更新lowerDoor,因为 lowerSlope > lowerDoor,lowerDoor 只能顺时针打开

  7. 新加入数据点p4,重新计算upperSlope,lowerSlope。并更新lowerDoor,不更新upperDoor

  8. 每次计算完上下斜率,进行upperDoor lowerDoor 检查是否超过平行

  9. 加入p4 的时候。发现upperDoor >= lowerDoor,两扇门超过平行水平,数据范围已经超过了CD,需要保存上一个数据点lastRead p3

    1. 标记 lastStored = p3,compressedList 加入p3

    2. p3 作为segment0 endpoint

    3. p3 作为segment1 startpoint

    4. p4 和p3 进行比较,重新计算upperDoor,lowerDoor(绿色虚线)


压缩算法

  1. 第一个数据点需要保存

  2. 第二个及以后的数据点,通过比较last read point

    1. 计算当前到segment startpoint 的上斜率,并和upperDoor 进行比较

    2. 计算当前到segment startpoint 的下斜率,并和lowerDoor进行比较
  3. upperDoor 只能逆时针打开,lowerDoor 只能顺时针打开

  4. 若两扇门处于或超过平行状态,并且当前点和上一存储点的时间间隔 >= comp min,存储上一个阅读的点,并开启新的segment

  5. 新的segment upperDoor,lowerDoor 计算通过当前数据和上一个segment endpoint 进行比较

  6. 若当前点和上一存储点的时间间隔 >= comp max,存储当前点,开启新的segment
  7. 若当前点和上一存储点的时间间隔 <= comp min,不存储当前点,防止noisy point
  8. 若全部rawData 读取完毕,compressedList 只有第一个数据(所有数据偏差不超过CD)则加入最后一个读取的数据点


性能测试


同等压缩误差值CD和数据量,数据波动越小,压缩比越高


  • 源数据为规律的波动pi 值,数据量 = 10000,振幅 = 100

  • Compression deviation = 0.5

  • 存储数据:556

  • 压缩比:94.44%


Image Added



OSISoft PI 查询方式

  • range query
    • 返回的是压缩后的数据点
    • 压缩后的数据是存储在数据归档中的实际原始数据。它被称为压缩,因为它已经通过压缩,并且保留了有意义的信息的值。如果由于压缩过程中丢失了一个值而没有得到准确的时间,则应调整压缩设置。
    • 统计数据为压缩后的统计信息,statistics与压缩后的数据保持一致
  • fill 插值查询
    • 指定时间戳,若不存在,根据前后两点的斜率,进行插值计算
    • 在iotdb 中,用户需要使用fill 进行插值计算

https://www.youtube.com/watch?v=ZMNIcKH5mgc&vl=en

https://www.youtube.com/watch?v=tr5KR499QyQ

Pi Compressed Data function




SDT Encoder

以下为,IoTDB + SDT 设计

  • SDT 需要在当前encoder的上一层,对数据进行压缩,pageWriter 中会调用默认的编码方式,对压缩后的数据进行编码
  • SDT 编码,将支持四种数据类型:float, double, int, long
  • 查询解码时,因为返回的是经过sdt 过滤后的数据信息,不需要进行解码,与当前查询的方式一致,不进行更改
  • stats 统计信息,在pageWriter.write() 压缩后进行更新,chunk中的stats 信息是集合了page stats,所以在内存中丢掉数据,stats不会有不一致现象

Image Added

TSEncoding

  • 在现有的编码类型基础上,将会增加五种编码类型
    • 若用户使用sdt 编码,根据数据类型设置encoding enum,  gorilla_sdt, plain_sdt, .... 
    • 若用户不使用sdt 编码,根据数据类型设置encoding enum,  gorilla, plain, ...

Image Added

ChunkWriterImpl

以下write 接口,都会进行 是否为sdtEncoding 的判断

public void write(long time, int value)

public void write(long time, float value)

public void write(long time, long value)

public void write(long time, double value)


Image Added

  • 刷入磁盘

PageReader 

需要提供的变量

  • private Decoder sdtDecoder;
  • private ByteBuffer buffer;

查询将分为两种

  1. 不需要解压的查询
    1. 范围查询

Select s1 where time > 3 and time < 10

Group by

… 

  1. 需要解压进行插值计算的查询
    1. 点查询

Select s1 where time = 3

      1. 需要注意 time = 3 的前后两点,是否跨page
      2. 调用 SDTDecoder 
        1. S1 time = 3 的数据点经过压缩,并存在了磁盘,直接返回该点
        2. S1 time = 3 的数据点不存在,找到s1 time = 3 的前后两个数据点并进行插值计算

点查询

  1. Ie time = x,page1, page2, page3 ..
    1. 若x < page1.startime || x > page3.endtime
      1. return null
      2. 在该ts 第一个和最后一个时间戳之外,不进行插值计算
    2. 若x >= page2.startime && x <= page2.endtime
      1. 遍历pageReaders 的时候,记录 maxPrevPair, minNextPair 
      2. 若存在time = 3 的数据点,直接返回
      3. 若最后遍历完getValue == null,则通过 maxPrevPair, minNextPair 进行插值计算
    3. 若x >= page2.endtime && x <= page3.starttime
      1. 遍历pageData 的时候,记录maxPrevPage, minNextPage
      2. 若生成pageReaders.size == 0,则通过 maxPrevPage, minNextPage 进行unpack page,decoder 返回插值计算的结果

范围查询

  • 与当前逻辑一致,直接返回压缩后的结果集,不进行插值计算

聚合查询

...

一条数据的查询

  • 与当前一致,返回压缩后的结果集

多条数据的查询

...