You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

选择过程

从新生成的顺序文件开始选择,对于每一个顺序文件

按版本顺序从老到新找到和该顺序文件重叠的所有乱序文件(不超过compaction_cross_space_max_select_unseq_file_num),如果没有乱序文件与之重叠,则跳过该顺序文件

当选中的顺序文件超过compaction_cross_space_max_select_seq_file_num,则将选中的顺序文件和乱序文件提交任务


文件选择算法

输入:顺序文件列表 seqFileList、乱序文件列表 unseqFileList

  • 将顺序文件列表按照时间顺序从新到老排序得到 seqFileListOrderByTime
  • 将乱序文件列表按照版本从老到新进行排序得到 unseqFileListOrderByVersion
  • selectedFiles=Map<TsFileResource, List<TsFileResource>>, selectedUnseqFiles=Set<TsFileResource>
  • hasSelect = false
  • 遍历顺序文件列表 for seqFile in seqFileListOrderByTime
    • 如果 seqFile 正在合并 || seqFile 未关闭
      • 如果 selectedFiles 非空
        • 提交一个合并任务,清空 selectedFiles 和 selectedUnseqFiles, hasSelect = true
        • if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
          • return hasSelect
      • continue
    • 遍历乱序文件列表 for unseqFile in unseqFileListOrderByVersion
      • if unseqFile 和 seqFile 有数据重叠
        • if unseqFile 未关闭 || unseqFile 正在合并
          • 如果 selectedFiles 非空
            • 提交一个合并任务,清空 selectedFiles, hasSelect = true
            • if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
              • return hasSelect
          • break
        • selectedFiles[seqFile].add(unseqFile)
        • selectedUnseqFiles.add(unseqFile)
        • if selectedUnseq.size() > compaction_cross_space_max_select_unseq_file_num
          • 提交一个合并任务,清空 selectedFiles, hasSelect = true
          • if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
            • return hasSelect
          • break
    • if selectedFiles.size() > compaction_cross_space_max_select_seq_file_num
      • 提交一个合并任务,清空 selectedFiles, hasSelect = true
      • if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
        • return hasSelect

执行过程

将这些文件进行跨文件空间的合并

正序遍历提交的顺序文件,对于每一个顺序文件

将其于相应的乱序文件中有重叠的部分进行合并并写入原顺序文件,保留整个乱序文件,并通过增加 .mods 文件以及修改 TsFileResource 来标记已经合并的部分

跨文件空间合并执行算法

输入:selectedFiles(Map<TsFileResource, List<TsFileResource>>)

  • 获取所有选中的顺乱序文件中的所有设备和传感器 deviceSensorsMap(Map<Device, List<Sensor>>)
  • for seqFile in selectedFiles.keySet()
    • 对 seqFile 建立一个 RestorableWriter
    • 获取与该 seqFile 进行合并的乱序文件列表 unseqFiles
    • 获取 unseqFiles 中所有的时间序列元数据 Map<Device, List<Sensor>> deviceSensorMap
    • for device, sensors in deviceChunkMetadataMap
      • restorableWriter.startChunkGroup(device) 
      • for sensor in sensors
        • 为这个 sensor 在所有的 unseqFiles 上建立一个 unseqReader
        • 根据算法1将数据重写到 seqFile 中
      • restorableWriter.endChunkGroup(device)
    • 在这些 unseqFiles 上建立一个 unseqReader
    • 通过算法1将 unseqReader 中的数据重写到 seqFile 中
    • 为unseqFiles增加 .mods 文件,并修改其 TsFileResource,抹去合并的数据



算法1


碎文件清理

设计一个碎文件清理任务,将没有与任何顺序文件发生重叠的乱序文件填充到顺序空间中,详情见碎文件清理任务


  • No labels