目的:将没有与任何顺序文件发生重叠的乱序文件填充到顺序空间中

碎文件清理任务提交

碎文件清理任务使用独立的线程池(不与空间内以及跨文件空间合并使用同一个线程池),在后台开启一个独立的线程进行定时提交。

碎文件清理任务选择流程

  • 从新到老遍历顺序文件列表 for s in seqFiles
    • s 正在合并:
      • continue
    • if s 是第一个文件
      • 按照文件版本从老到新遍历乱序文件 for u in unseqFiles:
        • if u.startTime() > s.endTime()
          • if u 未关闭 || u 正在合并
            • if UF.size() > 0
              • 提交一个碎文件清理任务,UF=[]
            • break
          • UF.add(u)
      • if UF.size() > 0
        • 提交一个碎片清理任务,UF = []
    • if s 是最后一个文件
      • 按照文件版本从老到新遍历乱序文件 for u in unseqFiles:
        • if u.endTime() < s.startTime()
          • if u 未关闭 || u 正在合并
            • if UF.size() > 0
              • 提交一个碎文件清理任务,UF=[]
            • break
          • UF.add(u)
      • if UF.size() > 0
        • 提交一个碎片清理任务,UF = []
      • break
    • 找到 s 的下一个文件 nextS
    • UF = []
    • 按照文件版本从老到新遍历乱序文件 for u in unseqFiles:
      • if u.startTime > nextS.endTime && u.endTime  < s.startTime
        • if u 未关闭 || u 正在合并
          • if UF.size() > 0
            • 提交一个碎文件清理任务,UF=[]
          • break
        • UF.add(u)
    • 如果 UF.size() > 0
      • 提交一个碎文件清理任务,UF=[]

非碎片重叠文件直接移动

场景

在将乱序文件嵌入到顺序文件列表时,存在这样的情况,一些乱序文件既没有与别的乱序文件重叠,也没有和顺序文件重叠,同时大小也比较合适,那么这时候就可以将这些乱序文件直接插到顺序文件列表中

例如在上图中,4号乱序文件和5号乱序文件既没有和顺序文件发生重叠,也没有和任何乱序文件发生重叠,那么就可以直接将它们经过重命名后移动到顺序文件空间中;3号乱序文件和6号乱序文件之间发生了重叠,需要将其进行合并后生成一个新文件插入到顺序文件空间中;7 号文件没有和顺序文件以及乱序文件产生重叠,也可以在重命名后移动到顺序文件空间中。


碎文件清理任务执行

输入:顺序文件 firstSeqFile, secondSeqFile,选中的乱序文件列表 selectedUnseqFileList

要求

  • (firstSeqFile == null && secondSeqFile != null) || (firstSeqFile != null && secondSeqFile == null ) || (firstSeqFile != null && secondSeqFile != null) 
  • unseqFileList 中的文件版本号依次底层,且全局的乱序文件列表中不存在与 selectedUnseqFileList 中的文件重叠且版本号更小的乱序文件。
  • 当 firstSeqFile != null 且 secondSeqFile == null 时,要求 selectedUnseqFileList 中的所有文件的 startTime 都大于 firstSeqFile 的 endTime
  • 当 firstSeqFile == null 且 secondSeqFile != null 时,要求 selectedUnseqFileList 中的所有文件的 endTime 都小于 secondSeqFile 的 startTime
  • 当 firstSeqFile 和 secondSeqFile 都不为空时,firstSeqFile.startTime > secondSeqFile.endTime,且对于 selectedUnseqFileList 中的任意一个乱序文件 u, firstSeqFile.startTime > u.endTime > u.startTime > secondSeqFile.endTime

执行

  • 将文件根据重叠情况整理成簇List<List<TsFileResource>> unseqFileClusters
  • for fileCluster in unseqFileCluster
    • if fileCluster.size() == 1
      • if fileCluster 中仅有的这个文件大小 > cross_space_fragment_file_target_size && 没有未关闭的临时文件 
        • 将该文件进行重命名后移动到顺序文件空间中
      • else
        • 将该文件的内容解压缩写到一个临时的文件中(如果有未关闭的临时文件,则写到未关闭的文件中)
        • 判断临时文件的大小,如果够大就将其封口然后重命名并且移动到顺序文件列表中
      • continue
    • else
      • 将 fileCluster 中的 unseqFile 合并到一个临时文件中(执行流程就是乱序空间内文件合并的流程)(如果有未关闭的临时文件,则放到未关闭的临时文件中;否则新建一个临时文件),然后将该临时文件经过重命名后移动到顺序文件列表中


将乱序文件整理成簇

输入:乱序文件列表 unseqFileList

输出:List<List<TsFileResource>> 文件簇列表,每个文件簇中的文件必然至少与同一簇中的一个文件发生了重叠

  • 将乱序文件根据 startTime 进行排序得到 unseqFileListOrderByTime

  • fileCluster = []
  • tmpCluster = []
  • for unseqFile in  unseqFileListOrderByTime
    • if tmpCluster.size() == 0
      • tmpCluster.add(unseqFile)
      • tmpClusterEndTime = unseqFile.endTime, tmpClusterStartTime=unseqFile.startTime
    • else
      • 如果 tmpCluster 和 unseqFile 发生了重叠(和文件重叠的判断相同)
        • tmpCluster.add(unseqFile)
        • tmpClusterEndTime = max(tmpClusterEndTime, unseqFile.endTime)
        • tmpClusterStartTime = max(tmpClusterStartTime, unseqFile.startTime)
      • else
        • fileCluster.add(tmpCluster)
        • tmpCluster=[]
        • tmpCluster.add(unseqFile)
        • tmpClusterEndTime = unseqFile.endTime, tmpClusterStartTime=unseqFile.startTime


重命名规则

输入:一个要移动到顺序文件列表中的乱序文件或者临时文件 file

  • 将 file 的名字改成  (后一个顺序文件的时间戳-1)-(后一个顺序文件的版本号)-0-1.tsfile
  • 如果存在同名文件,则改成(后一个顺序文件的时间戳-1)-(系统最新版本号)-0-1.tsfile,同时更新系统最新版本号

合并多个 unseqFile

参考空间内文件合并执行流程

输入:待合并的乱序文件列表

  • deviceSet=[]
  • 如果有未关闭的临时文件,那么 writer = 未关闭的临时文件的 RestorableWriter,否则 writer = 新建的临时文件的 RestorableWriter
  • for tsfile in TsFileList
    • 从 FileReaderManager 中获取此 tsfile 的 TsFileSequenceReader,并读出该文件的设备列表,添加至 deviceSet
  • for device in deviceSet
    • metadataIteratorList = []
    • for tsfile in TsFileList
      • 读取 tsfile 中 device 对应的 ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出 max_degree_of_index_node 个 sensor 及其对应的 ChunkMetadata 列表)
      • 将 ChunkMetadataListIterator 添加到 metadataIteratorList
    • 遍历 算法1 输出的每批待合并的 sensor 列表
      • 对于待合并列表中的每一个 sensor
        • 采取 反序列化Page合并算法
  • for tsfile in TsFileList
    • 将 FileReaderManager 中此文件 decreaseReaderReference
  • 序列化新文件的 TsFileResource
  • 关闭新文件 writer

算法1:

输入:多个文件的 ChunkMetadataListIterator(iterator),Iterator 每次输出的 List 内的 sensor 个数为 max_index_degree(假设有3个文件:file1:(s1,s2),(s3)   file2:(s1, s3)  file3:(s1,s2),(s3,s5)

输出:每轮待合并的 sensor 列表

描述:每个 iterator 取1个 List,找到每个 List 的最大字典序的 sensor,组成集合 S,本次合并从头开始合并到 S 中最小字典序的 sensor_lex_min_lex_min,直到所有都合并完 。并从 List 中清除已合并的 sensor,如果此 List 消耗完,获取下一个 Lst

优势:每次至少会消耗完一个 file 的一个 List

  • getNextSensors(List<ChunkMetadataListIterator> iterators)
  • getNextSensors(List<ChunkMetadataListIterator> iterators)


第一轮:file1(s1,s2)+file2(s1,s3)+file3(s1,s2) → S=S=(s2, s3),最小字典序 sensor_lex_min 是 s2 → 从 s2, s3),最小字典序 sensor_lex_min 是 s2 → 从 s1 合并到 合并到 s2

第二轮:file1(s3)+file2(s3)+file3(s3,s5)  → S=S=(s3, s5, s5),最小字典序 sensor_lex_min 是 s3 →  从 s3 合到 s3,最小字典序 sensor_lex_min 是 s3 →  从 s3 合到 s3

第三轮:file3(s5) → S=S=(s5),最小字典序 sensor_lex_min 是 s5 → 从 s5 合到 s5,最小字典序 sensor_lex_min 是 s5 → 从 s5 合到 s5


反序列化Page合并算法

  • 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
  • 遍历上述的time-value列表,将数据写入新的ChunkWriter
  • 判断限流
  • ChunkWriter写入新文件




  • No labels