选择过程
从新生成的顺序文件开始选择,对于每一个顺序文件
按版本顺序从老到新找到和该顺序文件重叠的所有乱序文件(不超过compaction_cross_space_max_select_unseq_file_num),如果没有乱序文件与之重叠,则跳过该顺序文件
当选中的顺序文件超过compaction_cross_space_max_select_seq_file_num,则将选中的顺序文件和乱序文件提交任务
文件选择算法
输入:顺序文件列表 seqFileList、乱序文件列表 unseqFileList
- 将顺序文件列表按照时间顺序从新到老排序得到 seqFileListOrderByTime
- 将乱序文件列表按照版本从老到新进行排序得到 unseqFileListOrderByVersion
- selectedFiles=Map<TsFileResource, List<TsFileResource>>, selectedUnseqFiles=Set<TsFileResource>
- hasSelect = false
- 遍历顺序文件列表 for seqFile in seqFileListOrderByTime
- 如果 seqFile 正在合并 || seqFile 未关闭
- 如果 selectedFiles 非空
- 提交一个合并任务,清空 selectedFiles 和 selectedUnseqFiles, hasSelect = true
- if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
- return hasSelect
- continue
- 如果 selectedFiles 非空
- 遍历乱序文件列表 for unseqFile in unseqFileListOrderByVersion
- if unseqFile 和 seqFile 有数据重叠
- if unseqFile 未关闭 || unseqFile 正在合并
- 如果 selectedFiles 非空
- 提交一个合并任务,清空 selectedFiles, hasSelect = true
- if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
- return hasSelect
- break
- 如果 selectedFiles 非空
- selectedFiles[seqFile].add(unseqFile)
- selectedUnseqFiles.add(unseqFile)
- if selectedUnseq.size() > compaction_cross_space_max_select_unseq_file_num
- 提交一个合并任务,清空 selectedFiles, hasSelect = true
- if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
- return hasSelect
- break
- if unseqFile 未关闭 || unseqFile 正在合并
- if unseqFile 和 seqFile 有数据重叠
- if selectedFiles.size() > compaction_cross_space_max_select_seq_file_num
- 提交一个合并任务,清空 selectedFiles, hasSelect = true
- if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
- return hasSelect
- 如果 seqFile 正在合并 || seqFile 未关闭
执行过程
将这些文件进行跨文件空间的合并
正序遍历提交的顺序文件,对于每一个顺序文件
将其于相应的乱序文件中有重叠的部分进行合并并写入原顺序文件,保留整个乱序文件,并通过增加 .mods 文件以及修改 TsFileResource 来标记已经合并的部分
跨文件空间合并执行算法
输入:selectedFiles(Map<TsFileResource, List<TsFileResource>>)
- 获取所有选中的顺乱序文件中的所有设备和传感器 deviceSensorsMap(Map<Device, List<Sensor>>)
- for seqFile in selectedFiles.keySet()
- 对 seqFile 建立一个 RestorableWriter
- 获取与该 seqFile 进行合并的乱序文件列表 unseqFiles
- 获取 unseqFiles 中所有的时间序列元数据 Map<Device, List<Sensor>> deviceSensorMap
- for device, sensors in deviceChunkMetadataMap
- restorableWriter.startChunkGroup(device)
- for sensor in sensors
- 为这个 sensor 在所有的 unseqFiles 上建立一个 unseqReader
- 根据算法1将数据重写到 seqFile 中
- restorableWriter.endChunkGroup(device)
- 在这些 unseqFiles 上建立一个 unseqReader
- 通过算法1将 unseqReader 中的数据重写到 seqFile 中
- 为unseqFiles增加 .mods 文件,并修改其 TsFileResource,抹去合并的数据
算法1
碎文件清理
设计一个碎文件清理任务,将没有与任何顺序文件发生重叠的乱序文件填充到顺序空间中,详情见碎文件清理任务