选择过程
从新生成的顺序文件开始选择,对于每一个顺序文件
按版本顺序从老到新找到和该顺序文件重叠的所有乱序文件(不超过compaction_cross_space_max_select_unseq_file_num),如果没有乱序文件与之重叠,则跳过该顺序文件
当选中的顺序文件超过compaction_cross_space_max_select_seq_file_num,则将选中的顺序文件和乱序文件提交任务
文件选择算法
输入:顺序文件列表 seqFileList、乱序文件列表 unseqFileList
- 将顺序文件列表按照时间顺序从新到老排序得到 seqFileListOrderByTime
- 将乱序文件列表按照版本从老到新进行排序得到 unseqFileListOrderByVersion
- selectedFiles=Map<TsFileResource, List<TsFileResource>>
- hasSelect = false
- 遍历顺序文件列表 for seqFile in seqFileListOrderByTime
- 如果 seqFile 正在合并 || seqFile 未关闭
- 如果 selectedFiles 非空
- 提交一个合并任务,清空 selectedFiles 和 selectedUnseqFiles, hasSelect = true, ++currentTaskNum
- if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
- return hasSelect
- continue
- 如果 selectedFiles 非空
- 遍历乱序文件列表 for unseqFile in unseqFileListOrderByVersion
- if unseqFile 和 seqFile 有数据重叠
- if unseqFile 未关闭 || unseqFile 正在合并
- 如果 selectedFiles 非空
- 提交一个合并任务,清空 selectedFiles, hasSelect = true, ++currentTaskNum
- if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
- return hasSelect
- break
- 如果 selectedFiles 非空
- selectedFiles[seqFile].add(unseqFile)
- if selectedFiles[seqFile].size() > compaction_cross_space_max_select_unseq_file_num
- 提交一个合并任务,清空 selectedFiles, hasSelect = true, ++currentTaskNum
- if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
- return hasSelect
- break
- if unseqFile 未关闭 || unseqFile 正在合并
- if unseqFile 和 seqFile 有数据重叠
- if selectedFiles.size() > compaction_cross_space_max_select_seq_file_num
- 提交一个合并任务,清空 selectedFiles, hasSelect = true, ++currentTaskNum
- if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
- return hasSelect
- 如果 seqFile 正在合并 || seqFile 未关闭
判断顺序文件与乱序文件是否发生重叠
输入:seqFile、unseqFile
- return (seqFile.startTime >= unseqFile.startTime && seqFile.startTime <= unseqFile.endTime) || (seqFile.endTime >= unseqFile.startTime && seqFile.endTime <= unseqFile.endTime)
执行过程
将这些文件进行跨文件空间的合并
正序遍历提交的顺序文件,对于每一个顺序文件
将其于相应的乱序文件中有重叠的部分进行合并并写入原顺序文件,保留整个乱序文件,并通过增加 .mods 文件以及修改 TsFileResource 来标记已经合并的部分
跨文件空间合并执行算法
输入:selectedFiles(Map<TsFileResource, List<TsFileResource>>)
- for seqFile in selectedFiles.keySet()
- 对 seqFile 建立一个 RestorableWriter
- 获取与该 seqFile 进行合并的乱序文件列表 unseqFiles
- 获取该 seqFile 的元数据列表 deviceChunkMetadataMap(Map<Device, Map<Sensor, List<ChunkMetadata>>>)
- 获取 unseqFiles 中所有的时间序列元数据 Map<Device, List<Sensor>> deviceSensorMap
- for device, sensors in deviceChunkMetadataMap
- restorableWriter.startChunkGroup(device)
- for sensor in sensors
- 为这个 sensor 在所有的 unseqFiles 上建立一个 unseqReader
- 根据算法1将数据重写到 seqFile 中
- restorableWriter.endChunkGroup(device)
- 处理 unseqFiles,将合并过的数据抹去
- 为unseqFiles,修改其 TsFileResource,抹去合并的数据
修改 unseqFile 的 TsFileResource
- unseqFile.startTime < seqFile.startTime && unseqFile.endTime > seqFile.endTime,将这个文件根据文件拆分算法拆成两个文件
- unseqFile.startTime < seqFile.startTime && unseqFile.endTime < seqFile.endTime,将 unseqFile.endTime 修改为 seqFile.startTime, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
- unseqFile.startTime > seqFile.startTime && unseqFile.endTime > seqFile.endTime,将 unseqFile.startTime 修改为 seqFile.endTime, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
- unseqFile.startTime > seqFile.startTime && unseqFile.endTime < seqFile.endTime,将 unseqFile 删除, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
文件拆分算法
输入:unseqFile,seqFileStartTime, seqFileEndTime
- 为 unseqFile 建立 Reader
- 获取 unseqFile 中的所有元数据 Map<Device, Map<Sensor, List<ChunkMetadata>> deviceSensorsMetadata
- 新建 fileWriterBefore 和 fileWriterAfter
- for device, sensors in deviceSensorsMetadata
- fileWriterBefore.startChunkGroup(device), fileWriterAfter.startChunkGroup(device)
- 新建 chunkWriterBefore, chunkWriterAfter
- hasStartChunkGroupBefore = false, hasStartChunkGroupAfter = false
- for sensor in sensors
- 找到这个 sensor 对应的 chunkMetadataList
- for chunkMetadata in chunkMetadataList
- 获取对应的 Chunk
- if Chunk.startTime >= seqFile.startTime && chunk.endTime <= seqFile.endTime
- continue
- else if Chunk.endTime < seqFile.startTime
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- fileWriterBefore.writeChunk(chunk)
- else if Chunk.startTime > seqFile.endTime
- if !hasStartChunkGroupAfter : fileWriterAfter.startChunkGroup(device), hasStartChunkGroupAfter=true
- fileWriterAfter.writeChunk(chunk)
- else
- 为这个 Chunk 构建一个 ChunkReader
- while ChunkReader.hasNext() && chunkReader.currentTimestamp < seqFile.startTime
- chunkWriterBefore.write(chunkReader.next())
- if chunkWriterBefore.size > flush_threshold
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterBefore to fileWriterBefore
- while ChunkReader.hasNext() && chunkReader.currentTimestamp <= seqFile.endTime
- chunkReader.next()
- while chunkReader.hasNext():
- chunkWriterAfter.write(chunkReader.next())
- if chunkWriterAfter.size > flush_threshold
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterAfter to fileWriterAfter
- if chunkWriterAfter.size > 0
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterAfter to fileWriterAfter
- if chunkWriterBefore.size > 0
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterBefore to fileWriterBefore
- if hasStartChunkGroupBefore: fileWriterBefore.endChunkGroup(device)
- if hasStartChunkGroupAfter: fileWriterAfter.endChunkGroup(device)
- 将拆分后的两个文件分别命名为 {原文件时间戳 - 1}-{原文件版本号}-{0}-{1}.tsfile 和 {原文件时间戳 - 1}-{原文件版本号}-{0}-{1}.tsfile,并删除原文件
算法1
输入:待合并的 sensor
- if 这个 sensor 存在于 deviceChunkMetadataMap 中
- 从 deviceChunkMetadataMap 中获取该 sensor 在 seqFile 中的 sensorChunkMetadataList
- for chunkMetadata in sensorChunkMetadataList
- chunk = readMemChunk(chunkMetadata)
- unclosedChunkPoint = 0L
- tsFileResource.updateStartTime(chunkMetadata.startTime)
- tsFileResource.updateEndTime(chunkMetadata.endTime)
- 判断当前 chunk 是否被修改 modified
- if isOverlap(chunkMetadata)
- 将其与乱序数据重叠的部分合并后写入 ChunkWriter(见 算法2)
- unclosedChunkPoint+=写入的点数
- else if isChunkTooSmall(chunkMetadata)
- 将这个 Chunk 解压缩后写入 ChunkWriter
- unclosedChunkPoint+=写入的点数
- else
- if unclosedChunkPoint > 0 || modified
- 将这个 Chunk 解压缩后写入 ChunkWriter
- else
- 将这个 Chunk 不解压缩写入 writer
- if unclosedChunkPoint > 0 || modified
- if unclosedChunkPoint > merge_chunk_point_num_threshold
- 将 ChunkWriter 写入 writer
- unclosedChunkPoint = 0
- 否则
- 通过算法3将乱序数据写入 seqFile 中
算法2
输入:chunk, unseqReader, deviceEndTime, tsFileResource, modification
- 对 Chunk 构建一个 ChunkReader
- 使用 ChunkReader 获取 Chunk 中的每一个 Page
- while pageData.hasNext()
- 获取 pageData 当前的时间戳 seqTime
- overwriteSeqPoint = false
- while 该 sensor 还有乱序数据并且乱序数据的下一个时间戳小于 seqTime
- 将乱序数据写入 ChunkWrtier 中
- tsFileResource.updateStartTime(乱序数据)
- tsFileResource.updateEndTime(乱序数据)
- 如果乱序数据的时间戳 == seqTime
- overwriteSeqPoint = true
- if !overwriteSeqPoint && !pageData 当前时间戳在 modification 中被删除
- 将 pageData 当前的数据点写入 ChunkWriter 中
- tsFileResource.updateStartTime(乱序数据)
- tsFileResource.updateEndTime(乱序数据)
- while pageData.hasNext()
算法3
输入:unseqReader、restorableWriter、seqFile
- 创建一个 ChunkWriter
- while unseqReader.currentTimestamp < seqFile.startTime && unseqReader.hasNext()
- unseqReader.next()
- while unseqReader.currentTimestamp >= seqFile.startTime && unseqReader.currentTimestamp <= seqFile.endTime && unseqReader.hasNext()
- 将 unseqReader 当前的时间点写入 ChunkWriter 中
- unseqReader.next()
- 如果 ChunkWriter 中的数据点的个数 > CHUNK_MIN_POINT_NUM
- flush ChunkWriter to restorableWriter
- 如果 ChunkWriter 中还有没刷盘的数据
- flush chunkWriter to restorableWriter
碎文件清理
设计一个碎文件清理任务,将没有与任何顺序文件发生重叠的乱序文件填充到顺序空间中,详情见碎文件清理任务