场景与目标
本设计解决的目标场景:
(1)时间序列条数多(亿级)
(2)每条时间序列写入负载平均(极端场景,此场景下缓存最易失效)
每次刷盘数据量
根据时序数据库数据文件存储结构调研中的分析,每次刷盘的数据量为:
1亿时间序列,其中设备有100万个,每个设备下有100个工况,每条时间序列刷盘时包含10个数据点
痛点问题
参考调研文档时序数据库数据文件存储结构调研中的定量分析部分,可知当前文件格式每一个chunk和page都存储了统计信息和header,而在海量时间序列场景下,每条时间序列每次刷盘的数据不超过10个点,也就是一个page中有10个点,每个chunk中只有一个page。在这种情况下,数据点少时统计信息加速效果不大,统计信息和header占用了过多空间,存在空间冗余。
概要设计
Page和Chunk合并
根据每次刷盘数据量、Experiments of TsFile index area中(II) Experiment about combine Chunk and Page的分析及源码分析,有如下结论:
(1)每条时间序列刷盘后仅生成一个page,tsfile对于这种情况不会保存page statistic,关键代码如下:
if (first) { sizeWithoutStatistic += ReadWriteForEncodingUtils.writeUnsignedVarInt(uncompressedSize, pageBuffer); sizeWithoutStatistic += ReadWriteForEncodingUtils.writeUnsignedVarInt(compressedSize, pageBuffer); } else { ReadWriteForEncodingUtils.writeUnsignedVarInt(uncompressedSize, pageBuffer); ReadWriteForEncodingUtils.writeUnsignedVarInt(compressedSize, pageBuffer); statistics.serialize(pageBuffer); }
(2)每条时间序列刷盘后仅生成一个chunk,tsfile对于这种情况不会保存chunk statistic,关键代码如下:
boolean serializeStatistic = (chunkMetadataList.size() > 1);
(3)Timeseries metadata中保存的信息如下:
byte timeSeriesMetadataType; String measurementID; // can remove varInt chunkMetaDataListDataSize;
chunk header中保存的header信息如下:
byte marker; varInt data_size; byte datatype; byte compressionType; byte encoding type
page header中保存的header信息如下:
varInt uncompressed_size; varInt compressed_size;
观察可知三者的header并不冲突,合并并不能节省空间
综上所述,目前的tsfile对于单chunk单page已经做了元数据去冗余的优化,本设计没必要合并Page和Chunk,但Timeseries metadata中保存了一份额外的measurement ID信息,这个信息完全可以从Timeseries index tree中读到,所以可以删除
去除统计信息
实验
实验场景
模拟海量时间序列一次刷盘1%的数据量,即100万时间序列,其中有1万个设备,每个设备下有100个工况,每条时间序列写10个点,生成tsfile
每条时间序列的类型为long,编码算法为RLE,压缩算法为SNAPPY
试试GZIP
去除统计信息前后文件大小比较
每序列10个点(SNAPPY)
文件部分 / 文件所含内容 | 原始文件 | 仅保留起止时间统计信息 | 去除全部统计信息 |
---|---|---|---|
数据区(Timeseries metadata之前的部分) | 126.24 MB | 126.24 MB | 126.24 MB |
元数据区(Timeseries metadata之后的部分) | 79.17 MB | 38.17 MB | 22.17 MB |
总大小 | 205.41 MB | 164.41 MB | 148.41 MB |
总大小比例 | 100 % | 80.04 % | 72.25 % |
每序列1个点(SNAPPY)
文件部分 / 文件所含内容 | 原始文件 | 仅保留起止时间统计信息 | 去除全部统计信息 |
---|---|---|---|
数据区(Timeseries metadata之前的部分) | 48.99 MB | 48.99 MB | 48.99 MB |
元数据区(Timeseries metadata之后的部分) | 79.16 MB | 38.16 MB | 22.16 MB |
总大小 | 128.15 MB | 87.15 MB | 71.15 MB |
总大小比例 | 100 % | 68.01 % | 55.52 % |
每序列10个点(GZIP)
文件部分 / 文件所含内容 | 原始文件 | 仅保留起止时间统计信息 | 去除全部统计信息 |
---|---|---|---|
数据区(Timeseries metadata之前的部分) | 140.11 MB | 140.11 MB | 140.11 MB |
元数据区(Timeseries metadata之后的部分) | 79.17 MB | 38.17 MB | 22.17 MB |
总大小 | 219.28 MB | 178.28 MB | 162.28 MB |
总大小比例 | 100 % | 81.30 % | 76.74 % |
每序列1个点(GZIP)
文件部分 / 文件所含内容 | 原始文件 | 仅保留起止时间统计信息 | 去除全部统计信息 |
---|---|---|---|
数据区(Timeseries metadata之前的部分) | 62.03 MB | 62.03 MB | 62.03 MB |
元数据区(Timeseries metadata之后的部分) | 79.16 MB | 38.16 MB | 22.16 MB |
总大小 | 141.20 MB | 100.20 MB | 84.20 MB |
总大小比例 | 100 % | 70.96 % | 59.63 % |
去除统计信息前后写入时间比较 (压缩方式为GZIP)
每序列10个点
写入速度 / 文件所含内容 | 原始文件 | 仅保留起止时间统计信息 | 去除全部统计信息 |
---|---|---|---|
写入时间 | 81.053 s | 79.042 s | 74.134 s |
写入时间比 | 100 % | 97.52 % | 91.46 % |
每序列1个点
写入速度 / 文件所含内容 | 原始文件 | 仅保留起止时间统计信息 | 去除全部统计信息 |
---|---|---|---|
写入时间 | 79.911 s | 72.701 s | 65.384 s |
写入时间比 | 100 % | 90.98 % | 81.82 % |
实验源码
写入程序
import java.io.File; import java.util.Random; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.write.TsFileWriter; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema; /** * An example of writing data with TSRecord to TsFile It uses the interface: public void * addMeasurement(MeasurementSchema measurementSchema) throws WriteProcessException */ public class TsFileWriteTest { public static int deviceNum; public static int sensorNum; public static int fileNum; public static int pointNum = 10; public static void main(String[] args) { try { deviceNum = 1; // Integer.parseInt(cl.getOptionValue("d")); sensorNum = 1; // Integer.parseInt(cl.getOptionValue("m")); fileNum = 1; // Integer.parseInt(cl.getOptionValue("f")); } catch (Exception e) { e.printStackTrace(); } for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) { try { String path = "/Users/SilverNarcissus/iotdb/tsfile_test" + "/withoutStat/" + deviceNum + "." + sensorNum + "/test" + fileIndex + ".tsfile"; File f = FSFactoryProducer.getFSFactory().getFile(path); if (f.exists()) { f.delete(); } try { TsFileWriter tsFileWriter = new TsFileWriter(f); for (int i = 1; i <= deviceNum; i++) { for (int j = 1; j <= sensorNum; j++) { Path path1 = new Path(Constant.DEVICE_PREFIX + i); tsFileWriter.registerTimeseries( path1, new UnaryMeasurementSchema( Constant.SENSOR_ + j, TSDataType.INT64, TSEncoding.RLE)); } } // construct TSRecord int count = 0; for (int j = 1; j <= deviceNum; j++) { for (int i = 1; i <= pointNum; i++) { TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + j); for (int t = 1; t <= sensorNum; t++) { DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_ + t, new Random().nextLong()); tsRecord.addTuple(dPoint1); count++; } // write TSRecord tsFileWriter.write(tsRecord); if (count % 100000 == 0) { System.out.println(count); tsFileWriter.flushAllChunkGroups(); } } } tsFileWriter.close(); } catch (Throwable e) { e.printStackTrace(); System.out.println(e.getMessage()); } } catch (Throwable e) { e.printStackTrace(); System.out.println(e.getMessage()); } } } }
对Statistics.class中serialize方法的修改
保留全部统计信息
public int serialize(OutputStream outputStream) throws IOException { int byteLen = 0; byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(count, outputStream); byteLen += ReadWriteIOUtils.write(startTime, outputStream); byteLen += ReadWriteIOUtils.write(endTime, outputStream); // value statistics of different data type byteLen += serializeStats(outputStream); return byteLen; }
仅保留起止时间统计信息
public int serialize(OutputStream outputStream) throws IOException { int byteLen = 0; byteLen += ReadWriteIOUtils.write(startTime, outputStream); byteLen += ReadWriteIOUtils.write(endTime, outputStream); return byteLen; }
不保留任何统计信息
public int serialize(OutputStream outputStream) throws IOException { int byteLen = 0; return byteLen; }