场景与目标

本设计解决的目标场景:

(1)时间序列条数多(亿级)

(2)每条时间序列写入负载平均(极端场景,此场景下缓存最易失效)


每次刷盘数据量

根据时序数据库数据文件存储结构调研中的分析,每次刷盘的数据量为:

1亿时间序列,其中设备有100万个,每个设备下有100个工况,每条时间序列刷盘时包含10个数据点


痛点问题

参考调研文档时序数据库数据文件存储结构调研中的定量分析部分,可知当前文件格式每一个chunk和page都存储了统计信息和header,而在海量时间序列场景下,每条时间序列每次刷盘的数据不超过10个点,也就是一个page中有10个点,每个chunk中只有一个page。在这种情况下,数据点少时统计信息加速效果不大,统计信息和header占用了过多空间,存在空间冗余。


概要设计

Page和Chunk合并

根据每次刷盘数据量、实验:TsFile 索引区#(II)ExperimentaboutcombineChunkandPage中(II) Experiment about combine Chunk and Page的分析及源码分析,有如下结论:

(1)每条时间序列刷盘后仅生成一个page,tsfile对于这种情况不会保存page statistic,关键代码如下:

if (first) {
  sizeWithoutStatistic +=
      ReadWriteForEncodingUtils.writeUnsignedVarInt(uncompressedSize, pageBuffer);
  sizeWithoutStatistic +=
      ReadWriteForEncodingUtils.writeUnsignedVarInt(compressedSize, pageBuffer);
} else {
  ReadWriteForEncodingUtils.writeUnsignedVarInt(uncompressedSize, pageBuffer);
  ReadWriteForEncodingUtils.writeUnsignedVarInt(compressedSize, pageBuffer);
  statistics.serialize(pageBuffer);
}


(2)每条时间序列刷盘后仅生成一个chunk,tsfile对于这种情况不会保存chunk statistic,关键代码如下:

boolean serializeStatistic = (chunkMetadataList.size() > 1);


(3)Timeseries metadata中保存的信息如下:

byte timeSeriesMetadataType;
String measurementID;
varInt chunkMetaDataListDataSize;


chunk header中保存的header信息如下:

byte marker;
varInt data_size;
String measurementID; // redundant, can remove
byte datatype;
byte compressionType;
byte encoding type

page header中保存的header信息如下:

varInt uncompressed_size;
varInt compressed_size;

观察可知三者的header并不冲突,合并并不能节省空间


综上所述,目前的tsfile对于单chunk单page已经做了元数据去冗余的优化,本设计没必要合并Page和Chunk,ChunkHeader中保存了一份的measurement ID信息,而TimeseriesMetadata中保存了measurement ID,存在冗余,可以删除,需要修改TsFileSequenceReader中如下接口:

// old interface
private ChunkHeader readChunkHeader(long position, int chunkHeaderSize);

// new interface
private ChunkHeader readChunkHeader(long position, int chunkHeaderSize, String measurementID)


去除统计信息

Statistic

增加一个byte标识,标记其是否含有统计信息块,进而决定序列化和反序列化的方式

TsfileResource

增加一个byte标识,标记其是否含有统计信息块

查询适配

修改StorageGroupProcessor中的getFileResourceListForQuery方法,对于无统计信息的Tsfile生成一个特殊的TsfileResource,在loadChunk时自动生成统计信息块,进而复用原有查询代码


优缺点分析:

优点

(1)节省存储空间,每序列10个点的情况下,移除统计信息大约能节省28%的空间;每序列1个点的情况下,移除统计信息大约能节省45%的空间

(2)提高写入性能:写入性能约提高2-3%

缺点

(1)查询无法使用统计信息进行过滤和加速聚合

(2)删除chunk header里面的measurementID后,Tsfile无法顺序恢复chunk,需要重做所有WAL


实验

实验场景

模拟海量时间序列一次刷盘1%的数据量,即100万时间序列,其中有1万个设备,每个设备下有100个工况,每条时间序列写10个点,生成tsfile

每条时间序列的类型为long,值随机生成,编码算法为RLE,压缩算法为SNAPPY或GZIP

去除统计信息前后文件大小比较

每序列10个点(SNAPPY)

文件部分 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
数据区(Timeseries metadata之前的部分)126.24 MB126.24 MB126.24 MB126.24 MB
元数据区(Timeseries metadata之后的部分)79.17 MB38.17 MB22.17 MB12.25 MB
总大小205.41 MB164.41 MB148.41 MB138.49 MB
总大小比例100 %80.04 %72.25 %67.42 %

每序列1个点(SNAPPY)

文件部分 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
数据区(Timeseries metadata之前的部分)48.99 MB48.99 MB48.99 MB48.99 MB
元数据区(Timeseries metadata之后的部分)79.16 MB38.16 MB22.16 MB12.24 MB
总大小128.15 MB87.15 MB71.15 MB61.23 MB
总大小比例100 %68.01 %55.52 %47.80 %

每序列10个点(GZIP)

文件部分 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
数据区(Timeseries metadata之前的部分)140.11 MB140.11 MB140.11 MB140.11 MB
元数据区(Timeseries metadata之后的部分)79.17 MB38.17 MB22.17 MB12.25 MB
总大小219.28 MB178.28 MB162.28 MB152.36 MB
总大小比例100 %81.30 %74.01 %69.48 %

每序列1个点(GZIP)

文件部分 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
数据区(Timeseries metadata之前的部分)62.03 MB62.03 MB62.03 MB62.03 MB
元数据区(Timeseries metadata之后的部分)79.16 MB38.16 MB22.16 MB12.25 MB
总大小141.20 MB100.20 MB84.20 MB74.28 MB
总大小比例100 %70.96 %59.63 %52.61 %

去除统计信息前后写入时间比较 (SSD)

CPU:i5-5257U CPU @ 2.70GHz 

每序列10个点 (SNAPPY)

写入速度 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
写入时间39.858 s39.575 s39.674 s38.948 s
写入时间比100 % 99.29 %99.54 %97.72 %

每序列1个点 (SNAPPY)

写入速度 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
写入时间37.771 s37.401 s37.023 s36.993 s
写入时间比100 %99.02 %98.02 %97.94 %

每序列10个点 (GZIP)

写入速度 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
写入时间54.053 s54.042 s53.782 s53.465 s
写入时间比100 % 99.98 %99.52 %98.91 %

每序列1个点 (GZIP)

写入速度 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
写入时间49.911 s49.701 s49.384 s49.224 s
写入时间比100 %99.58 %98.94 %98.47 %


去除统计信息前后写入时间比较 (HDD)

CPU:i7-10700 CPU @ 2.90GHz

每序列10个点 (SNAPPY)

写入速度 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
写入时间8.887 s8.837 s8.729 s8.627 s
写入时间比100 % 99.44 %98.22 %97.07 %

每序列1个点 (SNAPPY)

写入速度 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
写入时间8.182 s8.023 s8.092 s7.998 s
写入时间比100 %98.06 %98.90 %97.75 %

每序列10个点 (GZIP)

写入速度 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
写入时间

18.591 s

18.293 s17.992 s18.003 s
写入时间比100 % 98.40 %96.78 %96.83 %

每序列1个点 (GZIP)

写入速度 / 文件所含内容原始文件去掉值统计信息去除全部统计信息去除全部统计信息及measurementID
写入时间16.392 s16.113 s16.104 s15.982 s
写入时间比100 %98.30 %98.24 %97.50 %


实验结论

(1)存储空间:每序列10个点的情况下,移除统计信息大约能节省33%的空间;每序列1个点的情况下,移除统计信息大约能节省50%的空间

(2)写入速度:去除统计信息可以节省2-3%的写入时间,写入时间与CPU关系很大,按照磁盘写入速度,SSD写磁盘占用的时间不到整个写入时间的4%,HDD写磁盘占用的时间不到整个写入时间的8%

实验源码

写入程序

import java.io.File;
import java.util.Random;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;

/**
 * An example of writing data with TSRecord to TsFile It uses the interface: public void
 * addMeasurement(MeasurementSchema measurementSchema) throws WriteProcessException
 */
public class TsFileWriteTest {
  public static int deviceNum;
  public static int sensorNum;
  public static int fileNum;
  public static int pointNum = 10;

  public static void main(String[] args) {
    try {
      deviceNum = 1; // Integer.parseInt(cl.getOptionValue("d"));
      sensorNum = 1; // Integer.parseInt(cl.getOptionValue("m"));
      fileNum = 1; // Integer.parseInt(cl.getOptionValue("f"));
    } catch (Exception e) {
      e.printStackTrace();
    }

    for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) {
      try {
        String path =
            "/Users/SilverNarcissus/iotdb/tsfile_test"
                + "/withoutStat/"
                + deviceNum
                + "."
                + sensorNum
                + "/test"
                + fileIndex
                + ".tsfile";
        File f = FSFactoryProducer.getFSFactory().getFile(path);
        if (f.exists()) {
          f.delete();
        }

        try {
          TsFileWriter tsFileWriter = new TsFileWriter(f);
          for (int i = 1; i <= deviceNum; i++) {
            for (int j = 1; j <= sensorNum; j++) {
              Path path1 = new Path(Constant.DEVICE_PREFIX + i);
              tsFileWriter.registerTimeseries(
                  path1,
                  new UnaryMeasurementSchema(
                      Constant.SENSOR_ + j, TSDataType.INT64, TSEncoding.RLE));
            }
          }
          // construct TSRecord
          int count = 0;
          for (int j = 1; j <= deviceNum; j++) {
            for (int i = 1; i <= pointNum; i++) {
              TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + j);
              for (int t = 1; t <= sensorNum; t++) {
                DataPoint dPoint1 =
                    new LongDataPoint(Constant.SENSOR_ + t, new Random().nextLong());
                tsRecord.addTuple(dPoint1);
                count++;
              }
              // write TSRecord
              tsFileWriter.write(tsRecord);
              if (count % 100000 == 0) {
                System.out.println(count);
                tsFileWriter.flushAllChunkGroups();
              }
            }
          }
          tsFileWriter.close();
        } catch (Throwable e) {
          e.printStackTrace();
          System.out.println(e.getMessage());
        }

      } catch (Throwable e) {
        e.printStackTrace();
        System.out.println(e.getMessage());
      }
    }
  }
}

对Statistics.class中serialize方法的修改

保留全部统计信息

public int serialize(OutputStream outputStream) throws IOException {
   int byteLen = 0;
   byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(count, outputStream);
   byteLen += ReadWriteIOUtils.write(startTime, outputStream);
   byteLen += ReadWriteIOUtils.write(endTime, outputStream);
   // value statistics of different data type
   byteLen += serializeStats(outputStream);
   return byteLen;
}


仅保留起止时间统计信息

public int serialize(OutputStream outputStream) throws IOException {
   int byteLen = 0;
   byteLen += ReadWriteIOUtils.write(startTime, outputStream);
   byteLen += ReadWriteIOUtils.write(endTime, outputStream);
   return byteLen;
}


不保留任何统计信息

public int serialize(OutputStream outputStream) throws IOException {
   int byteLen = 0;
   return byteLen;
}
  • No labels