You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 21 Next »

名词解释

顺序空间:顺序数据文件所在的空间

乱序空间:乱序数据文件所在的空间

层级合并(在顺序空间或乱序空间内部执行)

不存在对齐时间序列时的层级合并流程

  • 根据 seq_file_num_in_each_level / unseq_file_num_in_each_level 获取此次待合并层级中的 TsFileResource,作为待合并文件列表
  • 对待合并文件列表中的每个文件,创建一个 TsFIleSequenceReader,并存在 ReaderCache 中,并获取该文件的设备列表,生成所有的 device 集合
  • 遍历device集合
    • 根据 ReaderCache 得到相应的 TsFileSequenceReader
    • 根据 device 读取各个文件对应的 ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出 max_degree_of_index_node 个 sensor 及其对应的 ChunkMetadata 列表)
    • 遍历 算法1 输出的每批待合并的 sensor 列表
      • 对于待合并列表中的每一个 sensor
        • 如果是乱序空间的文件合并,采取 反序列化Page合并算法
        • 如果是顺序空间的文件合并
          • 如果某个 Chunk 的数据点数小于 merge_page_point_number,采取 反序列化Page合并算法
          • 否则,采取 追加Page合并算法
    • 关闭 ReaderCache 中所有的 reader
    • 序列化新文件的 TsFileResource
    • 关闭新文件 writer


算法1

输入:多个文件的 ChunkMetadataListIterator,Iterator 每次输出的 List 内的 sensor 个数为 max_index_degree(假设有3个文件:file1:(s1,s2)(s3,s4)   file2:(s1, s3)  file3:(s3,s4)

输出:每轮待合并的 sensor 列表

描述:每个迭代器取1个 List,找到每个 List 的最大字典序的 sensor,组成集合 S,本次合并从头开始合并到 S 中最小字典序的 sensor 。并从 List 中清除已合并的 sensor,如果此 List 消耗完,获取下一个 Lst

优势:每次至少会消耗完一个 file 的一个 List


第一轮:file1(s1,s2)+file2(s1,s3)+file3(s3,s4) → (s1,s2)

第二轮:file1(s3, s4)+file2(s3)+file3(s3,s4)  → (s3)

第三轮:file1(s4)+file3(s4) → (s4)


反序列化Page合并算法

  • 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
  • 遍历上述的time-value列表,将数据写入新的ChunkWriter
  • 判断限流
  • ChunkWriter写入新文件


追加Page合并算法

  •  通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新ChunkChunkMetadata
  • 判断限流
  • 将合并完的ChunkChunkMetadata写入新文件


存在对齐时间序列时的层级合并流程

  • 根据 seq_file_num_in_each_level / unseq_file_num_in_each_level 获取此次待合并层级中的 TsFileResource,作为待合并文件列表
  • 对待合并文件列表中的每个文件,创建一个 TsFIleSequenceReader,并存在 ReaderCache 中,并获取该文件的设备列表,生成所有的 device 集合
  • 遍历device集合
    • 根据 ReaderCache 得到相应的TsFileSequenceReader
    • 根据 device 读取各个文件对应的ChunkMetadataListIterator
    • 循环算法2输出的待合并的 IMeasurementSchema 列表
      • 对于待合并的 IMeasurementSchema 
      • 如果是乱序的文件合并,采取 存在对齐时间序列的反序列化 Page 合并算法
      • 如果是顺序空间的文件合并
        • 如果某个 Chunk(如果是对齐时间序列,判断 TimeChunk) 的数据点数小于 merge_page_point_number,采取 存在对齐时间序列的反序列化 Page 合并算法
        • 如果page足够大,采取 存在对齐时间序列的追加 Page 合并算法
    • 关闭 ReaderCache 中所有的reader
    • 序列化新文件的 TsFileResource
    • 关闭新文件writer


算法2

输入:多个文件的 ChunkMetadataListIterator,Iterator 每次输出的 List 内的 sensor 个数(包含对齐时间序列内的序列数,不包含 time)为 max_degree_of_index_node(由于对齐时间序列是个整体,因此可能会超过部分)

输出:每轮待合并的 sensor 列表

描述:每个迭代器取1个 List,找到每个 List 的最大字典序(对齐时间序列按 time 的名字 $#$id 来比较)的 sensor,组成集合 S,本次合并从头开始合并到 S 中最小字典序的 sensor 。并从 List 中清除已合并的 sensor,如果此 List 消耗完,获取下一个 Lst

优势:每次至少会消耗完一个 file 的一个 List


Iterator 每次输出的 List 的例子:文件内的序列为:(time, s4,s5,s6), (time, s9, s10), s1,s2,s3,s7,s8

如果 max_degree_of_index_node 为 2, 每次取出的为:(time, s4,s5,s6); (time, s9, s10); s1,s2; s3,s7; s8

如果 max_degree_of_index_node 为 3,每次取出的为:(time, s4,s5,s6); (time, s9, s10), s1; s2, s3,s7; s8

如果 max_degree_of_index_node 为 4,每次取出的为:(time, s4,s5,s6), (time, s9, s10); s1 s2, s3,s7; s8


存在对齐时间序列的反序列化 Page 合并算法

  • 通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
    • 如果是MeasurementSchema,使用ChunkReader
    • 如果是VectorMeasurementSchema,使用VectorChunkReader
  • 遍历上述的time-value列表,将数据写入新的IChunkWriter
    • 如果是MeasurementSchema,使用ChunkWriterImpl
    • 如果是VectorMeasurementSchema,使用VectorChunkWriterImpl
  • 判断限流
  • 将IChunkWriter写入新文件


存在对齐时间序列的追加 Page 合并算法

  •  通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新ChunkChunkMetadata
    • 如果是MeasurementSchema,仅需要前一个chunk和后一个chunk进行合并
    • 如果是VectorMeasurementSchema,需要前一个VectorChunkMetadata对应的所有timeChunk和valueChunk与后一个所有的timeChunk和valueChunk合并
  • 判断限流
  • 将合并完的ChunkChunkMetadata写入新文件


例子:

VectorChunk1: timeChunk(page1,page2) s1Chunk(page3, page4) s2Chunk(page5)

VectorChunk2: timeChunk(page6) s1Chunk(page7) s2Chunk(page8)

合并后: timeChunk(page1, page2, page6) s1Chunk(page3, page4, page7), s2Chunk(page5, page8)

直接追加chunk合并

原流程存在的问题

对于层级合并来说(以顺序空间的层级合并为例),是通过seq_level_num和seq_file_num_in_each_level这两个参数来控制合并到最后的chunk大小的,即把原来的chunk扩大seq_file_num_in_each_levelseq_level_num-1倍,这种配置方案存在如下三个问题:

1、在配置时需要知道用户原先写出的一个chunk的大小,并根据实际经验来配置。这样的配置方式对用户来说难以自己上手配置,对于每一个用户来说,也都需要我们去观察文件并辅助配置,非常不方便

2、一旦用户使用了默认参数没有修改或者配置错误,而用户本身场景又不需要合并太多次文件,那么就会占用很多无效的磁盘IO,甚至降低查询效率

3、在一些场景中,不同时间序列的写入速度是不一样的,那么按照这种合并方式,最后会合并出很多大小不一的chunk,写入速度慢的时间序列对应的chunk小,写入速度快的时间序列对应的chunk大,造成chunk大小不均匀,难以控制

例子:

假设用户需要的目标chunk是4倍大小的原始chunk,有如下文件

0层:file1(s1,s2,s2)+file2(s2,s2,s2) 即file1有1个s1的chunk,2个s2的chunk,file2有3个s2的chunk

合并至1层:file3(s1(1),s2(5)) 合并出来的文件有1个s1的chunk,1个5倍大小的s2的chunk,假设他与另一个file4(s1(1),s2(5))进行合并

合并至2层:file5(s1(2),s2(9)) 合并出来的文件有1个2倍大小的s1的chunk,1个9倍大小的s2的chunk

可以看到s2的chunk越合并越大,已经远远超过用户需要的chunk大小,s1对应的chunk却仍没有达到用户的要求

解决方案

使用配置merge_chunk_point_number_threshold在合并每一个chunk的时候控制,

如果在待合并列表中这个sensor对应的所有chunk都已经达到了这个阈值,则不再合并chunk,直接将读出来的chunk写入新文件

跨文件空间合并(将乱序文件合并至顺序空间)

不存在对齐时间序列的消除乱序文件合并流程

输入一组乱序文件U,以及一组顺序文件S,最大同时合并的时间序列数量为n,单个chunk的点数阈值tpt,是否进行Full Merge

  1. merge.log文件记录各乱序文件文件名,各顺序文件文件名,并记录“merge start”。
  2. 遍历US中各文件的metadata,获取其中包含的所有时间序列集合T
  3. 对于T中的每一个设备device,选出该设备的至多n条时间序列(目前按字典序选出)Tc进行merge,如果所有时间序列都被merge,转9;
  4. merge.log中记录“{Tc} start;
  5. 对于Tc中的每一条时间序列tsi,在U上构建ts的一个MergeReader记做ri;
  6. 对于Tc中的每一条时间序列tsi,构建一个ChunkWritter记做wi;
  7. 对于S中的每一个文件sj,创建一个merge临时数据文件s'j;
  8. 对于S中的每一个文件sj

              7.1. 开启一个新的ChunkGroup

              7.2. 对于Tc中的每一条时间序列tsi,在S上查询它们的Chunk,并将这些Chunk按照在文件中的位置排序。

              7.3. 取出位置最小的Chunk,记做c,如果已经没有Chunk,转7.4

                                7.3.1 找到c对应的MergeReader ri,如果ri当前的数据点的时间小于等于c的结束时间end­­­,将ri所有时间不大于end­­­的点和c合并,并将c写入到wi,转7.3.5

                     7.3.2 如果c的点数小于tpt或者c的上一个Chunk已经写入到wi,但是wi还没有进行flush,将c写入到wi,转7.3.5

                     7.3.3 如果进行Full Merge,将c写入到wi,转7.3.5;

                     7.3.4 记录这个没有被mergeChunk,转7.3

                     7.3.5 如果wi中已写入的点大于等于tpt,将wi flushs'j,转7.3;

              7.4 关闭这个ChunkGroup

  1. 对于每个在处理Tc的过程中写入过新Chunk的文件si', 在merge.log中记录“{ si'的文件名} {si'的当前长度}”,在merge.log中记录“{Tc} end”,转2
  2. merge.log中记录“all ts end
  3. 对于S中的每个文件si,如果存在si',则:

              10.1 如果si中标记已被合并的chunk占总chunk的比例大于某值threshold

                     10.1.0 merge.log中记录“{si'} start {si'的当前长度}

10.1.1 si中未标记已被合并的chunk写入si'

10.1.2 si'生成FileMetadata并写入到si'尾部;

10.1.3 等待对si的所有查询结束,并对i加锁;

10.1.4si'替代si

10.1.5 merge.log中记录“{si} end”, 对si解锁;

       10.2 否则:

              10.2.1 等待对si的所有查询结束,并对i加锁;

              10.2.2 si的尾部的FileMetadata截去;

10.2.3 merge.log中记录“{si} {si的当前长度}

              10.2.4 si'中的数据写到si尾部;

              10.2.5 si生成FileMetadata并写入到si尾部,该FileMetadata中不包含被标记已经合并的chunk,但是记录有多少chunk被标为已经合并;

                     10.2.6 merge.log中记录“{si} end”,删除si',对si解锁;

       11. 在merge.log中记录“merge end”,删除U中的所有文件和merge.log


存在对齐时间序列的消除乱序文件合并流程

  • 从MManager中的取出改storageGroup所有的device→IMeasurementSchema
  • 遍历device列表 (去掉了sensor组的概念)
    • 遍历顺序文件列表,对于每一个顺序文件
      • 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为1000个普通MeasurementSchema或超过1000个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能)
      • 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据 
        • 对于每一个IMeasurementSchema建立IMeasurementSchema→chunkMetadataList的列表
        • 读取每一个chunkMetadataList,建立List<List<Chunk>> chunks的结构
          • 如果是ChunkMetadata,将当前chunk读出包裹一个List放入chunks
          • 如果是VectorChunkMetadata,将当前chunk按timeChunk, valueChunk1,...valueChunkN 的顺序包裹一个List放入chunks
      • 对于每一个IMeasurementSchema→chunkMetadataList列表进行按合并子任务并行配置进行分组
        • 对于每个子任务的IMeasurementSchema→chunkMetadataList列表
          • 按文件顺序遍历chunks列表的chunkList,对于chunkList的第一个chunk, 如果有与unseq文件overlapped数据,则插入IChunkWriter
            • 如果IMeasurementSchema是MeasurementSchema,此时chunk列表的第一个chunk也是完整的chunk,直接写第一个chunk的数据写入ChunkWriterImpl
            • 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有本行数据写入 VectorChunkWriterImpl
          • 如果遍历完该unseq文件还有剩余的数据,则直接append到IChunkWriter后面
            • 如果IMeasurementSchema是MeasurementSchema,直接append第一个chunk的剩余数据
            • 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有剩余数据按行写入 VectorChunkWriterImpl


  • No labels