THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
将其于相应的乱序文件中有重叠的部分进行合并并写入原顺序文件,保留整个乱序文件,并通过增加 .mods 文件以及修改 TsFileResource 来标记已经合并的部分
跨文件空间合并执行算法
输入:selectedFiles(Map<TsFileResource, List<TsFileResource>>)
...
- unseqFile.startTime < seqFile.startTime && unseqFile.endTime > seqFile.endTime,将这个文件拆分成两份endTime,将这个文件根据文件拆分算法拆成两个文件
- unseqFile.startTime < seqFile.startTime && unseqFile.endTime < seqFile.endTime,将 unseqFile.endTime 修改为 seqFile.startTime, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
- unseqFile.startTime > seqFile.startTime && unseqFile.endTime > seqFile.endTime,将 unseqFile.startTime 修改为 seqFile.endTime, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
- unseqFile.startTime > seqFile.startTime && unseqFile.endTime < seqFile.endTime,将 unseqFile 删除, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
...
- 为 unseqFile 建立 Reader
- 获取 unseqFile 中的所有元数据 Map<Device, Map<Sensor, List<ChunkMetadata>> deviceSensorsMetadata
- 新建 fileWriterBefore 和 fileWriterAfter
- for device, sensors in deviceSensorsMetadata
- fileWriterBefore.startChunkGroup(device), fileWriterAfter.startChunkGroup(device)
- 新建 chunkWriterBefore, chunkWriterAfter
- hasStartChunkGroupBefore = false, hasStartChunkGroupAfter = false
- for sensor in sensors
- 找到这个 sensor 对应的 chunkMetadataList
- for chunkMetadata in chunkMetadataList
- 获取对应的 Chunk
- if Chunk.startTime >= seqFile.startTime && chunk.endTime <= seqFile.endTime
- continue
- else if Chunk.endTime < seqFile.startTime
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- fileWriterBefore.writeChunk(chunk)
- else if Chunk.startTime > seqFile.endTime
- if !hasStartChunkGroupAfter : fileWriterAfter.startChunkGroup(device), hasStartChunkGroupAfter=true
- fileWriterAfter.writeChunk(chunk)
- else
- 为这个 Chunk 构建一个 ChunkReader
- while ChunkReader.hasNext() && chunkReader.currentTimestamp < seqFile.startTime
- chunkWriterBefore.write(chunkReader.next())
- if chunkWriterBefore.size > flush_threshold
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterBefore to fileWriterBefore
- while ChunkReader.hasNext() && chunkReader.currentTimestamp <= seqFile.endTime
- chunkReader.next()
- while chunkReader.hasNext():
- chunkWriterAfter.write(chunkReader.next())
- if chunkWriterAfter.size > flush_threshold
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterAfter to fileWriterAfter
- if chunkWriterAfter.size > 0
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterAfter to fileWriterAfter
- if chunkWriterBefore.size > 0
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterBefore to fileWriterBefore
- fileWriterBeforeif hasStartChunkGroupBefore: fileWriterBefore.endChunkGroup(device), fileWriterAfter
- if hasStartChunkGroupAfter: fileWriterAfter.endChunkGroup(device)
- 将拆分后的两个文件分别命名为 {原文件时间戳 - 1}-{原文件版本号}-{0}-{1}.tsfile 和 {原文件时间戳 - 1}-{原文件版本号}-{0}-{1}.tsfile,并删除原文件
...