THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
将其于相应的乱序文件中有重叠的部分进行合并并写入原顺序文件,保留整个乱序文件,并通过增加 .mods 文件以及修改 TsFileResource 来标记已经合并的部分
跨文件空间合并执行算法
输入:selectedFiles(Map<TsFileResource, List<TsFileResource>>)
...
- unseqFile.startTime < seqFile.startTime && unseqFile.endTime > seqFile.endTime,将这个文件拆分成两份endTime,将这个文件根据文件拆分算法拆成两个文件
- unseqFile.startTime < seqFile.startTime && unseqFile.endTime < seqFile.endTime,将 unseqFile.endTime 修改为 seqFile.startTime, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
- unseqFile.startTime > seqFile.startTime && unseqFile.endTime > seqFile.endTime,将 unseqFile.startTime 修改为 seqFile.endTime, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
- unseqFile.startTime > seqFile.startTime && unseqFile.endTime < seqFile.endTime,将 unseqFile 删除, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
...
- 为 unseqFile 建立 Reader
- 获取 unseqFile 中的所有元数据 Map<Device, Map<Sensor, List<ChunkMetadata>> deviceSensorsMetadata
- 新建 fileWriterBefore 和 fileWriterAfter
- for device, sensors in deviceSensorsMetadata
- fileWriterBefore.startChunkGroup(device), fileWriterAfter.startChunkGroup(device)
- 新建 chunkWriterBefore, chunkWriterAfter
- hasStartChunkGroupBefore = false, hasStartChunkGroupAfter = false
- for sensor in sensors
- 找到这个 sensor 对应的 chunkMetadataList
- for chunkMetadata in chunkMetadataList
- 获取对应的 Chunk
- if Chunk.startTime >= seqFile.startTime && chunk.endTime <= seqFile.endTime
- continue
- else if Chunk.endTime < seqFile.startTime
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- fileWriterBefore.writeChunk(chunk)
- else if Chunk.startTime > seqFile.endTime
- if !hasStartChunkGroupAfter : fileWriterAfter.startChunkGroup(device), hasStartChunkGroupAfter=true
- fileWriterAfter.writeChunk(chunk)
- else
- 为这个 Chunk 构建一个 ChunkReader
- while ChunkReader.hasNext() && chunkReader.currentTimestamp < seqFile.startTime
- chunkWriterBefore.write(chunkReader.next())
- if chunkWriterBefore.size > flush_threshold
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterBefore to fileWriterBefore
- while ChunkReader.hasNext() && chunkReader.currentTimestamp <= seqFile.endTime
- chunkReader.next()
- while chunkReader.hasNext():
- chunkWriterAfter.write(chunkReader.next())
- if chunkWriterAfter.size > flush_threshold
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterAfter to fileWriterAfter
- if chunkWriterAfter.size > 0
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterAfter to fileWriterAfter
- if chunkWriterBefore.size > 0
- if !hasStartChunkGroupBefore : fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
- flush chunkWriterBefore to fileWriterBefore
- fileWriterBeforeif hasStartChunkGroupBefore: fileWriterBefore.endChunkGroup(device), fileWriterAfter
- if hasStartChunkGroupAfter: fileWriterAfter.endChunkGroup(device)
- 将拆分后的两个文件分别命名为 {原文件时间戳 - 1}-{原文件版本号}-{0}-{1}.tsfile 和 {原文件时间戳 - 1}-{原文件版本号}-{0}-{1}.tsfile tsfile,并删除原文件
算法1
输入:待合并的 sensor
- if 这个 sensor 存在于 deviceChunkMetadataMap 中
- 从 deviceChunkMetadataMap 中获取该 sensor 在 seqFile 中的 sensorChunkMetadataList
- for chunkMetadata in sensorChunkMetadataList
- chunk = readMemChunk(chunkMetadata)
- unclosedChunkPoint = 0L
- tsFileResource.updateStartTime(chunkMetadata.startTime)
- tsFileResource.updateEndTime(chunkMetadata.endTime)
- 判断当前 chunk 是否被修改 modified
- if isOverlap(chunkMetadata)
- 将其与乱序数据重叠的部分合并后写入 ChunkWriter(见 算法2)
- unclosedChunkPoint+=写入的点数
- else if isChunkTooSmall(chunkMetadata)
- 将这个 Chunk 解压缩后写入 ChunkWriter
- unclosedChunkPoint+=写入的点数
- else
- if unclosedChunkPoint > 0 || modified
- 将这个 Chunk 解压缩后写入 ChunkWriter
- else
- 将这个 Chunk 不解压缩写入 writer
- if unclosedChunkPoint > 0 || modified
- if unclosedChunkPoint > merge_chunk_point_num_threshold
- 将 ChunkWriter 写入 writer
- unclosedChunkPoint = 0
- 否则
- 通过算法3将乱序数据写入 seqFile 中
...