Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

将其于相应的乱序文件中有重叠的部分进行合并并写入原顺序文件,保留整个乱序文件,并通过增加 .mods 文件以及修改 TsFileResource 来标记已经合并的部分

Image RemovedImage Added

跨文件空间合并执行算法

输入:selectedFiles(Map<TsFileResource, List<TsFileResource>>)

...

  • unseqFile.startTime < seqFile.startTime && unseqFile.endTime > seqFile.endTime,将这个文件拆分成两份endTime,将这个文件根据文件拆分算法拆成两个文件
  • unseqFile.startTime < seqFile.startTime && unseqFile.endTime < seqFile.endTime,将 unseqFile.endTime 修改为 seqFile.startTime, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
  • unseqFile.startTime > seqFile.startTime && unseqFile.endTime > seqFile.endTime,将 unseqFile.startTime 修改为 seqFile.endTime, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
  • unseqFile.startTime > seqFile.startTime && unseqFile.endTime < seqFile.endTime,将 unseqFile 删除, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)

...

  • 为 unseqFile 建立 Reader
  • 获取 unseqFile 中的所有元数据 Map<Device, Map<Sensor, List<ChunkMetadata>> deviceSensorsMetadata
  • 新建 fileWriterBefore 和 fileWriterAfter
  • for device, sensors in deviceSensorsMetadata
    • fileWriterBefore.startChunkGroup(device), fileWriterAfter.startChunkGroup(device)
    • 新建 chunkWriterBefore, chunkWriterAfter
    • hasStartChunkGroupBefore = false, hasStartChunkGroupAfter = false
    • for sensor in sensors
      • 找到这个 sensor 对应的 chunkMetadataList
      • for chunkMetadata in chunkMetadataList
        • 获取对应的 Chunk
        • if Chunk.startTime >= seqFile.startTime && chunk.endTime <= seqFile.endTime
          • continue
        • else if Chunk.endTime < seqFile.startTime
          • if !hasStartChunkGroupBefore :  fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true
          • fileWriterBefore.writeChunk(chunk)
        • else if Chunk.startTime > seqFile.endTime
          • if !hasStartChunkGroupAfter :  fileWriterAfter.startChunkGroup(device), hasStartChunkGroupAfter=true 
          • fileWriterAfter.writeChunk(chunk)
        • else
          • 为这个 Chunk 构建一个 ChunkReader
          • while ChunkReader.hasNext() && chunkReader.currentTimestamp < seqFile.startTime
            • chunkWriterBefore.write(chunkReader.next())
            • if chunkWriterBefore.size > flush_threshold
              • if !hasStartChunkGroupBefore :  fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true 
              • flush chunkWriterBefore to fileWriterBefore
          • while ChunkReader.hasNext() && chunkReader.currentTimestamp <= seqFile.endTime
            • chunkReader.next()
          • while chunkReader.hasNext():
            • chunkWriterAfter.write(chunkReader.next())
            • if chunkWriterAfter.size > flush_threshold
              • if !hasStartChunkGroupBefore :  fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true 
              • flush chunkWriterAfter to fileWriterAfter
        • if chunkWriterAfter.size > 0
          • if !hasStartChunkGroupBefore :  fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true 
          • flush chunkWriterAfter to fileWriterAfter
        • if chunkWriterBefore.size > 0
          • if !hasStartChunkGroupBefore :  fileWriterBefore.startChunkGroup(device), hasStartChunkGroupBefore=true 
          • flush chunkWriterBefore to fileWriterBefore
    • fileWriterBeforeif hasStartChunkGroupBefore: fileWriterBefore.endChunkGroup(device), fileWriterAfter
    • if hasStartChunkGroupAfter: fileWriterAfter.endChunkGroup(device)
  • 将拆分后的两个文件分别命名为 {原文件时间戳 - 1}-{原文件版本号}-{0}-{1}.tsfile 和 {原文件时间戳 - 1}-{原文件版本号}-{0}-{1}.tsfile,并删除原文件

...