Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

名词解释

顺序空间:顺序数据文件所在的空间

乱序空间:乱序数据文件所在的空间

层级合并(在顺序空间或乱序空间内部执行)

不存在对齐时间序列时的层级合并流程

层级合并

本文档描述“在合并策略已经选定了待合并的文件列表后”的流程。

...

  • 根据待合并文件列表,依次使用每个文件的TsFileSequenceReader,获取该文件的设备列表,生成所有的device集合
  • 创建 TsFIleSequenceReader 缓存 ReaderCache
  • 遍历device集合
    • 根据文件从文件读取器cache中得到相应的根据 ReaderCache 得到相应的TsFileSequenceReader
    • 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出 metadataIterator(ChunkMetadataListIterator每次按字典序吐出 max_degree_of_index_degree node 个sensor及其对应的ChunkMetadata列表)
    • 遍历算法1输出的每批待合并的sensor列表输出的每批待合并的 sensor 列表
      • 对于待合并列表中的每一个sensor
        • 如果是乱序空间的文件合并,采取 反序列化Page合并算法
        • 如果是顺序空间的文件合并 如果是顺序空间的文件合并
          • 如果某个 Chunk 的数据点数小于 merge_page_point_number,采取 反序列化Page合并算法
          • 如果page足够大,采取 否则,采取 追加Page合并算法
    • 关闭 reader cache ReaderCache 中所有的reader
    • 序列化新文件的 TsFileResource
    • 关闭新文件writer

...

输入:多个文件的 ChunkMetadataListIterator(假设有3个文件:file1Iterator,Iterator 每次输出的 List 内的 sensor 个数为 max_index_degree(假设有3个文件:file1:(s1,s2)(s3,s4)   file2:(s1, s3)  file3:(s3,s4)

...

  •  通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新ChunkChunkMetadata
  • 判断限流
  • 将合并完的ChunkChunkMetadata写入新文件

Vector引入的问题

1、现在的合并流程是按sensor合并的,如果这个sensor其实是某个vector的一部分,那么使用deserialize合并读不出vector中留空占位的数据,写入时就会破坏原来排列整齐的vector,而append合并没有影响。

2、ChunkMetadataListIterator是根据sensor吐出ChunkMetadataList的,这些sensor与MeasurementSchema是一一对应的。

但是现在有了VectorMeasurementSchema,故一个IMeasurementSchema可能对应多个sensor,而现在的MManager又不支持查询一个sensor是不是在某个vector内,所以没有办法把按sensor合并改成按 VectorMeasurementSchema合并。

修改方案

将流程改为


存在对齐时间序列时的层级合并流程

  • 根据待合并文件列表,依次使用每个文件的TsFileSequenceReader,获取该文件的设备列表,生成所有的device集合
  • 创建 TsFIleSequenceReader 缓存 ReaderCache
  • 遍历device集合
    • 从MManager中遍历该device对应的所有IMeasurementSchema
      • 如果是 MeasurementSchema(单sensor)
        • 走原来流程
      • 如果是 VectorMeasurementSchema (多sensor)
        • 通过各自文件的reader有序把数据读出并整理出对应的sensor-time-value列表
        • 对这些sensor的time取并集,记为allTime
        • 根据sensor遍历 sensor-time-value 列表
          • 遍历allTime 
            • 遍历上述的sensor-time-value列表,将数据写入新的VectorChunkWriter,如果没有这个时间戳,则传isNull
          • 判断限流
          • 将VectorChunkWriter写入新文件
      • 如果是乱序空间的文件合并,采取deserialize合并
        • 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
        • 遍历上述的time-value列表,将数据写入新的ChunkWriter
        • 判断限流
        • ChunkWriter写入新文件
      • 如果是顺序空间的文件合并
        • 如果page足够大,采取append合并
          •  通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk点而有序合并出对应的新ChunkChunkMetadata
          • 判断限流
          • 将合并完的ChunkChunkMetadata写入新文件
        • 如果page不足够大,采取deserialize合并
          • 通过各自文件的reader有序把数据读出并整理出对应的sensor-time-value列表
          • 对这些sensor的time取并集,记为allTime
          • 根据sensor遍历 sensor-time-value 列表
            • 遍历allTime 
              • 遍历上述的sensor-time-value列表,将数据写入新的VectorChunkWriter,如果没有这个时间戳,则传isNull
            • 判断限流
            • 将VectorChunkWriter写入新文件
    • 关闭reader cache中所有的reader
    • 序列化新文件
    • 关闭新文件writer

问题

如此修改去掉了如下过程:

  • 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)

CPU占用率会提高。

并且因为要同时读多个sensor对应chunk的所有数据,内存占用会变高。

改进的流程

    • 根据 ReaderCache 得到相应的TsFileSequenceReader
    • 根据 device 读取各个文件对应的ChunkMetadataListIterator
    • 循环算法2输出的待合并的 IMeasurementSchema 列表
      • 对于待合并的 IMeasurementSchema 
      • 如果是乱序的文件合并,采取 存在对齐时间序列的反序列化 Page 合并算法
      • 如果是顺序空间的文件合并
        • 如果某个 Chunk(如果是对齐时间序列,判断 TimeChunk) 的数据点数小于 merge_page_point_number,采取 存在对齐时间序列的反序列化 Page 合并算法
        • 如果page足够大,采取 存在对齐时间序列的追加 Page 合并算法
    • 关闭 ReaderCache 中所有的reader
    • 序列化新文件的 TsFileResource
    • 关闭新文件writer


算法2

输入:多个文件的 ChunkMetadataListIterator,Iterator 每次输出的 List 内的 sensor 个数(包含对齐时间序列内的序列数,不包含 time)为 max_degree_of_index_node(由于对齐时间序列是个整体,因此可能会超过部分)

输出:每轮待合并的 sensor 列表

描述:每个迭代器取1个 List,找到每个 List 的最大字典序(对齐时间序列按 time 的名字 $#$id 来比较)的 sensor,组成集合 S,本次合并从头开始合并到 S 中最小字典序的 sensor 。并从 List 中清除已合并的 sensor,如果此 List 消耗完,获取下一个 Lst

优势:每次至少会消耗完一个 file 的一个 List


Iterator 每次输出的 List 的例子:文件内的序列为:(time, s4,s5,s6), (time, s9, s10), s1,s2,s3,s7,s8

如果 max_degree_of_index_node 为 2, 每次取出的为:(time, s4,s5,s6); (time, s9, s10); s1,s2; s3,s7; s8

如果 max_degree_of_index_node 为 3,每次取出的为:(time, s4,s5,s6); (time, s9, s10), s1; s2, s3,s7; s8

如果 max_degree_of_index_node 为 4,每次取出的为:(time, s4,s5,s6), (time, s9, s10); s1 s2, s3,s7; s8


存在对齐时间序列的反序列化 Page 合并算法

  • 根据待合并文件列表,依次使用每个文件的TsFileSequenceReader,获取该文件的设备列表,生成所有的device集合
  • 遍历device集合
  • 根据文件从文件读取器cache中得到相应的TsFileSequenceReader
  • 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为256个普通MeasurementSchema或超过256个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能)
  • 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据,对每一次读取后的数据做如下操作对ChunkMetadataListIterator吐出的数据做归并排序
  • 对于合并出来的每一个IMeasurementSchema及其对应的ChunkMetadata列表
  • 如果是乱序空间的文件合并,采取deserialize合并通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
    • 如果是MeasurementSchema,使用ChunkReader
    • 如果是VectorMeasurementSchema,使用VectorChunkReader
  • 遍历上述的time-value列表,将数据写入新的IChunkWriter
    • 如果是MeasurementSchema,使用ChunkWriterImpl
    • 如果是VectorMeasurementSchema,使用VectorChunkWriterImpl
  • 判断限流
  • 将IChunkWriter写入新文件 如果是顺序空间的文件合并


存在对齐时间序列的追加 Page 合并算法

  • 如果page足够大,采取append合并 通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新ChunkChunkMetadata
    • 如果是MeasurementSchema,仅需要前一个chunk和后一个chunk进行合并
    • 如果是VectorMeasurementSchema,需要前一个VectorChunkMetadata对应的所有timeChunk和valueChunk与后一个所有的timeChunk和valueChunk合并
  • 判断限流
  • 将合并完的ChunkChunkMetadata写入新文件
  • 如果page不足够大,采取deserialize合并
    • 通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
      • 如果是MeasurementSchema,使用ChunkReader
      • 如果是VectorMeasurementSchema,使用VectorChunkReader
    • 遍历上述的time-value列表,将数据写入新的IChunkWriter
      • 如果是MeasurementSchema,使用ChunkWriterImpl
      • 如果是VectorMeasurementSchema,使用VectorChunkWriterImpl
    • 判断限流
    • 将IChunkWriter写入新文件
  • 关闭reader cache中所有的reader
  • 序列化新文件
  • 关闭新文件writer

跨文件空间合并

原来的流程


例子:

VectorChunk1: timeChunk(page1,page2) s1Chunk(page3, page4) s2Chunk(page5)

VectorChunk2: timeChunk(page6) s1Chunk(page7) s2Chunk(page8)

合并后: timeChunk(page1, page2, page6) s1Chunk(page3, page4, page7), s2Chunk(page5, page8)



跨文件空间合并(将乱序文件合并至顺序空间)

不存在对齐时间序列的消除乱序文件合并流程

  • 从MManager中的取出该 storageGroup 所有的device→sensor
  • 遍历device列表 
    • 按组遍历device对应的sensor列表
  • 从MManager中的取出改storageGroup所有的device→sensor
  • 遍历device列表 
    • 按组遍历device对应的sensor列表(这里的组是最大同时合并的时间序列数量n)
      • 遍历顺序文件列表,对于每一个顺序文件
        • 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
        • 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据 
          • 对于每一个sensor建立sensor→chunkMetadataList的列表
        • 对于每一个sensor→chunkMetadataList列表进行按合并子任务并行配置进行分组
          • 对于每个子任务的sensor→chunkMetadataList列表
            • 遍历每一个chunkMetadata对应的Chunk,如果有与unseq文件overlapped数据,则插入
            • 如果遍历完该unseq文件还有剩余的数据,则直接append到结果文件后面

Vector引入的问题

1、如果合并的某个sensor是vector的一部分,又在有些时间戳上是空值,乱序合并后会把空值全部去除,导致vector不再整齐。

修改方案

  • 从MManager中的取出改storageGroup所有的device→IMeasurementSchema
  • 遍历device列表 
    • 按组遍历device对应的IMeasurementSchema列表(这里的组是最大同时合并的时间序列数量n)
      • 遍历顺序文件列表,对于每一个顺序文件 
        • 建立IMeasurementSchema->sensor->chunkMetaList的列表
        • 对于每一个IMeasurementSchema列表进行按合并子任务并行配置进行分组
          • 对于每个子任务的IMeasurementSchema→sensor→chunkMetadataList列表
            • 通过各自文件的reader有序把数据读出并整理出对应的sensor-time-value列表
            • 对这些sensor的time取并集,记为allTime
            • 根据sensor遍历 sensor-time-value 列表
              • 遍历allTime 
                • 遍历上述的sensor-time-value列表,将数据写入新的VectorChunkWriter,如果没有这个时间戳,则传isNull
              • 判断限流
              • 将VectorChunkWriter写入新文件

问题

如此修改去掉了如下过程:

  • 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)

CPU占用率会提高。

并且因为要同时读多个sensor以及本次选出的顺序乱序文件对应chunk的所有数据,内存占用会变高。

...


存在对齐时间序列的消除乱序文件合并流程

  • 从MManager中的取出改storageGroup所有的device→IMeasurementSchema
  • 遍历device列表 (去掉了sensor组的概念)
    • 遍历顺序文件列表,对于每一个顺序文件
      • 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为1000个普通MeasurementSchema或超过1000个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能)
      • 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据 
        • 对于每一个IMeasurementSchema建立IMeasurementSchema→chunkMetadataList的列表
        • 读取每一个chunkMetadataList,建立List<List<Chunk>> chunks的结构
          • 如果是ChunkMetadata,将当前chunk读出包裹一个List放入chunks
          • 如果是VectorChunkMetadata,将当前chunk按timeChunk, valueChunk1,...valueChunkN 的顺序包裹一个List放入chunks
      • 对于每一个IMeasurementSchema→chunkMetadataList列表进行按合并子任务并行配置进行分组
        • 对于每个子任务的IMeasurementSchema→chunkMetadataList列表
          • 按文件顺序遍历chunks列表的chunkList,对于chunkList的第一个chunk, 如果有与unseq文件overlapped数据,则插入IChunkWriter
            • 如果IMeasurementSchema是MeasurementSchema,此时chunk列表的第一个chunk也是完整的chunk,直接写第一个chunk的数据写入ChunkWriterImpl
            • 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有本行数据写入 VectorChunkWriterImpl
          • 如果遍历完该unseq文件还有剩余的数据,则直接append到IChunkWriter后面
            • 如果IMeasurementSchema是MeasurementSchema,直接append第一个chunk的剩余数据
            • 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有剩余数据按行写入 VectorChunkWriterImpl

...