Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

...


空间内合并执行流程(针对一元时间序列)

顺序空间:顺序数据文件所在的空间

乱序空间:乱序数据文件所在的空间

层级合并(在顺序空间或乱序空间内部执行)

不存在对齐时间序列时的层级合并流程

输入:待合并文件列表 TsFileList

  • deviceSet=[]
  • writer = 目标文件 RestorableTsFileIOWriter
  • for tsfile in TsFileList
    • 从 FileReaderManager 中获取此 tsfile 的 TsFileSequenceReader,并读出该文件的设备列表,添加至 deviceSet
  • for device in deviceSet
    • metadataIteratorList = []
    • for tsfile in TsFileList
  • 根据 seq_file_num_in_each_level / unseq_file_num_in_each_level 获取此次待合并层级中的 TsFileResource,作为待合并文件列表
  • 对待合并文件列表中的每个文件,创建一个 TsFIleSequenceReader,并存在 ReaderCache 中,并获取该文件的设备列表,生成所有的 device 集合
  • 遍历device集合
    • 根据 ReaderCache 得到相应的 TsFileSequenceReader
    • 根据 device 读取各个文件对应的 
      • 读取 tsfile 中 device 对应的 
      • ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出 max_degree_of_index_node 个 sensor 及其对应的 ChunkMetadata 列表)
      • 将 ChunkMetadataListIterator 添加到 metadataIteratorList
    • 遍历 算法1 输出的每批待合并的 sensor 列表
      • 对于待合并列表中的每一个 sensor
        • 如果是乱序空间的文件合并,采取 反序列化Page合并算法
        • 如果是顺序空间的文件合并
          • 如果某个 Chunk 的数据点数小于 merge_page_point_number,采取 反序列化Page合并算法
          • 否则,采取 追加Page合并算法
  • for tsfile in TsFileList
    • 将 FileReaderManager 中此文件 decreaseReaderReference关闭 ReaderCache 中所有的 reader
  • 序列化新文件的 TsFileResource
  • 关闭新文件 writer



算法1算法1:

输入:多个文件的 ChunkMetadataListIterator,Iterator Iterator(简称iterator),Iterator 每次输出的 List 内的 sensor 个数为 max_index_degree(假设有3个文件:file1_Iterator 返回两批 sensor_list:(s1,s2),(s3,s4)   file2) , file2_iterator:(s1, s3)  file3_iterator:(s1,s2),(s3,s4s5)

输出:每轮待合并的 sensor 列表

输出:List<sensor>

描述:每个 iterator 取1个 描述:每个迭代器取1个 List,找到每个 List 的最大字典序的 sensor,组成集合 S,本次合并从头开始合并到 S 中最小字典序的 sensor_lex_min,直到所有都合并完 。并从 List 中清除已合并的 sensor,如果此 List 消耗完,获取下一个 Lst

...

第一轮:file1(s1,s2)+file2(s1,s3)+file3(s3s1,s4s2)  → → S=(s1s2, s2)s3),最小字典序 sensor_lex_min 是 s2,输出 (s1,s2)

第二轮:file1(s3, s4)+file2(s3)+file3(s3,s4)  → s5)  → S=(s3, s5, s5)第三轮:file1(s4)+file3(s4) → (s4),最小字典序 sensor_lex_min 是 s3, 输出(s3)

第三轮:file3(s5) → S=(s5),最小字典序 sensor_lex_min 是 s5,输出(s5)




反序列化Page合并算法

  • 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
  • 遍历上述的time-value列表,将数据写入新的ChunkWriter
  • 判断限流
  • ChunkWriter写入新文件

...

如果在待合并列表中这个sensor对应的所有chunk都已经达到了这个阈值,则不再合并chunk,直接将读出来的chunk写入新文件

...

跨文件空间合并(将乱序文件合并至顺序空间)(针对一元时间序列)

不存在对齐时间序列的消除乱序文件合并流程

输入一组乱序文件U,以及一组顺序文件S,最大同时合并的时间序列数量为n,单个chunk的点数阈值tpt,是否进行Full Merge

...

              7.1. 开启一个新的ChunkGroup

              7.2. 对于Tc中的每一条时间序列tsi,在S上查询它们的Chunk,并将这些Chunk按照在文件中的位置排序。

              7.3. 取出位置最小的Chunk,记做c,如果已经没有Chunk,转7.4

                                7.3.1 找到c对应的MergeReader ri,如果ri当前的数据点的时间小于等于c的结束时间end­­­,将ri所有时间不大于end­­­的点和c合并,并将c写入到wi,转7.3.5

                     7.3.2 如果c的点数小于tpt或者c的上一个Chunk已经写入到wi,但是wi还没有进行flush,将c写入到wi,转7.3.5

                     7.3.3 如果进行Full Merge,将c写入到wi,转7.3.5;

                     7.3.4 记录这个没有被mergeChunk,转7.3

                     7.3.5 如果wi中已写入的点大于等于tpt,将wi flushs'j,转7.3;

              7.4 关闭这个ChunkGroup

...

FullMerge(完全合并)

出于IO和合并性能考虑,乱序合并不会重写整个顺序文件,获取一个整理完全的顺序文件,而是会将与乱序文件重叠的 chunk 合并重写后,追加到原顺序文件后面,不删除原chunk,称这个过程为 原地合并

但是过多的 原地合并 会造成无用的数据越来越多,甚至在某些情况下导致一个文件大小无限制地增大(见线上问题分析)

完全合并 则是在乱序合并时重写整个顺序文件,IO和合并性能较低,但能获取一个整理完全的顺序文件


重新设计整个合并流程,舍弃 原地合并 只保留 完全合并

外部并行

执行拆分流程:device→seqFile→sensor


输入:待合并文件列表 SeqTsFileList, UnseqTsFileList

  • 获取 SeqTsFileList和UnseqTsFileList 中的所有设备和传感器 deviceSensorsMap(Map<Device, List<Sensor>>)
  • 新建一个 Map<TsFileResource, RestorableFileWriter> newWriterCache;
  • 新建一个 Map<TsFileResource, TsFileResource> newTsFileResourceCache;
  • for device, sensors in deviceSensorsMap
    • 为 sensors 构建一个 bitMap,记录是否被合并,默认全部为 false
    • for seqFile in  SeqTsFileList
      • 从 newTsFileResourceCache 中获取,若不存在则新建一个 TsFileResource writer 
      • 从 newWriterCache 中获取,若不存在则新建一个 RestorableFileWriter writer 
      • writer.startChunkGroup(device)
      • 对于 seqFile 新建 ChunkMetadataListIterator
      • ChunkMetadataListIterator 中迭代获取该设备的 sensorList 及其对应的 List<ChunkMetadata> sensorChunkMetadataList(每次返回 max_degree_of_index_node 个 sensor),并获取 ModificationList,对于每一个迭代
        • for sensor, sensorChunkMetadataList:
          • 如果 sensorChunkMetadataList 不为空
            • 根据 算法1 将该 sensor 的 Chunk 与对应的 unseqReader 中的数据进行合并
            • 将该 sensor 在 bitMap 上的位置为 true
      • 如果当前的 seqFile 是最后一个 seqFile:
        • 将该 device 下在 bitMap 中记录未合并的 sensor 对应的 unseqReader 的数据写入这个 seqFile 的临时文件中
        • for unseqReader in unseqReaderList
          • if unseqReader 未读完
            • 新建一个 ChunkWriter 
            • 根据 算法3 将未读完的 unseqReader 的剩余数据写入 ChunkWriter 中
            • 将 ChunkWriter 写入到writer
      • writer.endChunkGroup()
  • for tsFileResource in newTsFileResourceCache
    • tsFileResource.serialize()
    • tsFileResource.close()
  • for writer in newWriterCache
    • writer.endFile()

算法1

输入:sensor, sensorChunkMetadataList, unseqReader, tsFileResource, modification

  • for chunkMetadata in sensorChunkMetadataList
    • chunk = readMemChunk(chunkMetadata)
    • unclosedChunkPoint = 0L
    • tsFileResource.updateStartTime(chunkMetadata.startTime)
    • tsFileResource.updateEndTime(chunkMetadata.endTime)
    • 判断当前 chunk 是否被修改 modified
    • if isOverlap(chunkMetadata)
      • 将其与乱序数据重叠的部分合并后写入 ChunkWriter(见 算法2
      • unclosedChunkPoint+=写入的点数
    • else if isChunkTooSmall(chunkMetadata)
      • 将这个 Chunk 解压缩后写入 ChunkWriter
      • unclosedChunkPoint+=写入的点数
    • else
      • if  unclosedChunkPoint > 0 || modified
        • 将这个 Chunk 解压缩后写入 ChunkWriter
      • else 
        • 将这个 Chunk 不解压缩写入 writer
    • if unclosedChunkPoint > merge_chunk_point_num_threshold
      • 将 ChunkWriter 写入 writer
      • unclosedChunkPoint = 0

算法2

输入:chunk, unseqReader, deviceEndTime, tsFileResource, modification

  • 对 Chunk 构建一个 ChunkReader
  • 使用 ChunkReader 获取 Chunk 中的每一个 Page
    • while pageData.hasNext() 
      • 获取 pageData 当前的时间戳 seqTime
      • overwriteSeqPoint = false
      • while 该 sensor 还有乱序数据并且乱序数据的下一个时间戳小于 seqTime
        • 将乱序数据写入 ChunkWrtier 中
        • tsFileResource.updateStartTime(乱序数据)
        • tsFileResource.updateEndTime(乱序数据)
        • 如果乱序数据的时间戳 == seqTime
          • overwriteSeqPoint = true
      • 根据 算法3 将该 unseqReader 小于等于 deviceEndTime 的剩余数据写入 ChunkWriter 中
      • if !overwriteSeqPoint && !pageData 当前时间戳在 modification 中被删除
        • 将 pageData 当前的数据点写入 ChunkWriter 中
        • tsFileResource.updateStartTime(乱序数据)
        • tsFileResource.updateEndTime(乱序数据)

算法3

输入:chunkWriter, unseqReader, timeLimit, tsFileResource

  • while unseqReader 还有数据 && 数据时间戳<=timeLimit
    • 将 unseqReader 当前数据写入 chunkWriter
    • tsFileResource.updateStartTime(乱序数据)
    • tsFileResource.updateEndTime(乱序数据)

...

              10.1 如果si中标记已被合并的chunk占总chunk的比例大于某值threshold

                     10.1.0 merge.log中记录“{si'} start {si'的当前长度}

10.1.1 si中未标记已被合并的chunk写入si'

10.1.2 si'生成FileMetadata并写入到si'尾部;

10.1.3 等待对si的所有查询结束,并对i加锁;

10.1.4si'替代si

10.1.5 merge.log中记录“{si} end”, 对si解锁;

       10.2 否则:

              10.2.1 等待对si的所有查询结束,并对i加锁;

              10.2.2 si的尾部的FileMetadata截去;

10.2.3 merge.log中记录“{si} {si的当前长度}

              10.2.4 si'中的数据写到si尾部;

              10.2.5 si生成FileMetadata并写入到si尾部,该FileMetadata中不包含被标记已经合并的chunk,但是记录有多少chunk被标为已经合并;

                     10.2.6 merge.log中记录“{si} end”,删除si',对si解锁;

       11. 在merge.log中记录“merge end”,删除U中的所有文件和merge.log

存在对齐时间序列的消除乱序文件合并流程

  • 从MManager中的取出改storageGroup所有的device→IMeasurementSchema
  • 遍历device列表 (去掉了sensor组的概念)
    • 遍历顺序文件列表,对于每一个顺序文件
    • 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为1000个普通MeasurementSchema或超过1000个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能)
    • 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据 
      • 对于每一个IMeasurementSchema建立IMeasurementSchema→chunkMetadataList的列表
      • 读取每一个chunkMetadataList,建立List<List<Chunk>> chunks的结构
        • 如果是ChunkMetadata,将当前chunk读出包裹一个List放入chunks
        • 如果是VectorChunkMetadata,将当前chunk按timeChunk, valueChunk1,...valueChunkN 的顺序包裹一个List放入chunks
    • 对于每一个IMeasurementSchema→chunkMetadataList列表进行按合并子任务并行配置进行分组对于每个子任务的IMeasurementSchema→chunkMetadataList列表
    • 按文件顺序遍历chunks列表的chunkList,对于chunkList的第一个chunk, 如果有与unseq文件overlapped数据,则插入IChunkWriter
      • 如果IMeasurementSchema是MeasurementSchema,此时chunk列表的第一个chunk也是完整的chunk,直接写第一个chunk的数据写入ChunkWriterImpl
      • 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有本行数据写入 VectorChunkWriterImpl
    • 如果遍历完该unseq文件还有剩余的数据,则直接append到IChunkWriter后面
    • 如果IMeasurementSchema是MeasurementSchema,直接append第一个chunk的剩余数据
    • 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有剩余数据按行写入 VectorChunkWriterImpl