Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • 从新到老遍历顺序文件列表 for s in seqFiles
    • s 正在合并:
      • continue
    • if s 是第一个文件
      • 按照文件版本从老到新遍历乱序文件 for u in unseqFiles:
        • if u.startTime() > s.endTime()
          • if u 未关闭 || u 正在合并
            • if UF.size() > 0
              • 提交一个碎文件清理任务,UF=[]
            • break
          • UF.add(u)
      • if UF.size() > 0
        • 提交一个碎片清理任务,UF = []
    • if s 是最后一个文件
      • 按照文件版本从老到新遍历乱序文件 for u in unseqFiles:
        • if u.endTime() < s.startTime()
          • if u 未关闭 || u 正在合并
            • if UF.size() > 0
              • 提交一个碎文件清理任务,UF=[]
            • break
          • UF.add(u)
      • if UF.size() > 0
        • 提交一个碎片清理任务,UF = []
      • break
    • 找到 s 的下一个文件 nextS
    • UF = []
    • 按照文件版本从老到新遍历乱序文件 for u in unseqFiles:
      • if u.startTime > = nextS.endTime && u.endTime  < s.startTime
        • if u 未关闭 || u 正在合并
          • if UF.size() > 0
            • 提交一个碎文件清理任务,UF=[]
          • break
        • UF.add(u)
    • 如果 UF.size() > 0
      • 提交一个碎文件清理任务,UF=[]

...

  • (firstSeqFile == null && secondSeqFile != null) || (firstSeqFile != null && secondSeqFile == null ) || (firstSeqFile != null && secondSeqFile != null) 
  • unseqFileList 中的文件版本号依次底层,且全局的乱序文件列表中不存在与 selectedUnseqFileList 中的文件重叠且版本号更小的乱序文件。
  • 当 firstSeqFile != null 且 secondSeqFile == null 时,要求 selectedUnseqFileList 中的所有文件的 startTime 都大于 firstSeqFile 的 endTime
  • 当 firstSeqFile == null 且 secondSeqFile != null 时,要求 selectedUnseqFileList 中的所有文件的 endTime 都小于 secondSeqFile 的 startTime
  • 当 firstSeqFile 和 secondSeqFile 都不为空时,firstSeqFile.startTime > secondSeqFile.endTime,且对于 selectedUnseqFileList 中的任意一个乱序文件 u, firstSeqFile.startTime > u.endTime > u.startTime > secondSeqFile.endTime

执行

  • 获取 UnseqTsFileList 中的所有设备和传感器 deviceSensorsMap(Map<Device, List<Sensor>>)
  • 将文件根据重叠情况整理成簇List<List<TsFileResource>> 将文件根据重叠情况整理成簇List<List<TsFileResource>> unseqFileClusters
  • for fileCluster in unseqFileCluster
    • if fileCluster.size() == 1
      • if fileCluster 中仅有的这个文件大小 > cross_space_fragment_file_target_size size && 没有未关闭的临时文件 
        • 将该文件进行重命名后移动到顺序文件空间中
      • else
        • 将该文件的内容解压缩写到一个临时的文件中(如果有未关闭的临时文件,则写到未关闭的文件中)
        • 判断临时文件的大小,如果够大就将其封口然后重命名并且移动到顺序文件列表中
      • continue
    • else
      • 将 fileCluster 中的这个 unseqFile 中的 unseqFile 合并到一个临时文件中(执行流程就是乱序空间内文件合并的流程)(如果有未关闭的临时文件,则放到未关闭的临时文件中;否则新建一个临时文件),然后将该临时文件经过重命名后移动到顺序文件列表中


重命名规则

输入:一个要移动到顺序文件列表中的乱序文件或者临时文件 file

  • 将 file 的名字改成  (后一个顺序文件的时间戳-1)-(后一个顺序文件的版本号)-0-1.tsfile
  • 如果存在同名文件,则改成(后一个顺序文件的时间戳-1)-(系统最新版本号)-0-1.tsfile,同时更新系统最新版本号
  • else
    • 将该文件的内容写到一个临时的文件中,不关闭该文件
  • continue
  • else将 fileCluster 中的 unseqFile 合并到一个临时文件中(如果有未关闭的临时文件,则放到未关闭的临时文件中;否则新建一个临时文件),然后将该临时文件经过重命名后(规则与前面 if 语句中的重命名过程相同)移动到顺序文件列表中

合并多个 unseqFile

参考空间内文件合并执行流程

输入:待合并的乱序文件列表

  • deviceSet=[]
  • 如果有未关闭的临时文件,那么 writer = 未关闭的临时文件的 RestorableWriter,否则 writer = 新建的临时文件的 RestorableWriter
  • for tsfile in TsFileList
    • 从 FileReaderManager 中获取此 tsfile 的 TsFileSequenceReader,并读出该文件的设备列表,添加至 deviceSet
  • for device in deviceSet
    • metadataIteratorList = []
    • for tsfile in TsFileList
      • 读取 tsfile 中 device 对应的 ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出 max_degree_of_index_node 个 sensor 及其对应的 ChunkMetadata 列表)
      • 将 ChunkMetadataListIterator 添加到 metadataIteratorList
    • 遍历 算法1 输出的每批待合并的 sensor 列表
      • 对于待合并列表中的每一个 sensor
        • 采取 反序列化Page合并算法
  • for tsfile in TsFileList
    • 将 FileReaderManager 中此文件 decreaseReaderReference
  • 序列化新文件的 TsFileResource
  • 关闭新文件 writer

算法1:

输入:多个文件的 ChunkMetadataListIterator(iterator),Iterator 每次输出的 List 内的 sensor 个数为 max_index_degree(假设有3个文件:file1:(s1,s2),(s3)   file2:(s1, s3)  file3:(s1,s2),(s3,s5)

...