Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • for seqFile in selectedFiles.keySet()
    • 对 seqFile 建立一个 RestorableWriter
    • 获取与该 seqFile 进行合并的乱序文件列表 unseqFiles
    • 获取该 seqFile 的元数据列表 deviceChunkMetadataMap(Map<Device, Map<Sensor, List<ChunkMetadata>>>)
    • 获取 unseqFiles 中所有的时间序列元数据 Map<Device, List<Sensor>> deviceSensorMap
    • for device, sensors in deviceChunkMetadataMap
      • restorableWriter.startChunkGroup(device) 
      • for sensor in sensors
        • 为这个 sensor 在所有的 unseqFiles 上建立一个 unseqReader
        • 根据算法1将数据重写到 seqFile 中
      • restorableWriter.endChunkGroup(device)
    • 处理 unseqFiles,将合并过的数据抹去
    • 为unseqFiles,为unseqFiles增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE),修改其 TsFileResource,抹去合并的数据

...

  • unseqFile.startTime < seqFile.startTime && unseqFile.endTime > seqFile.endTime,不用修改 TsFileResourceendTime,将这个文件拆分成两份
  • unseqFile.startTime < seqFile.startTime && unseqFile.endTime < seqFile.endTime,将 unseqFile.endTime 修改为 seqFile.startTime, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
  • unseqFile.startTime > seqFile.startTime && unseqFile.endTime > seqFile.endTime,将 unseqFile.startTime 修改为 seqFile.endTime, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)
  • unseqFile.startTime > seqFile.startTime && unseqFile.endTime < seqFile.endTime,将 unseqFile 删除, 增加 .mods 文件(storageGroupName, seqFile.startTime, seqFile.endTime, Long.MAX_VALUE)

文件拆分算法

输入:unseqFile,seqFileStartTime, seqFileEndTime

  • 为 unseqFile 建立 Reader
  • 获取 unseqFile 中的所有元数据 Map<Device, Map<Sensor, List<ChunkMetadata>> deviceSensorsMetadata
  • 新建 fileWriterBefore 和 fileWriterAfter
  • for device, sensors in deviceSensorsMetadata
    • fileWriterBefore.startChunkGroup(device), fileWriterAfter.startChunkGroup(device)
    • 新建 chunkWriterBefore, chunkWriterAfter
    • for sensor in sensors
      • 找到这个 sensor 对应的 chunkMetadataList
      • for chunkMetadata in chunkMetadataList
        • 获取对应的 Chunk
        • if Chunk.startTime >= seqFile.startTime && chunk.endTime <= seqFile.endTime
          • continue
        • else if Chunk.endTime < seqFile.startTime
          • fileWriterBefore.writeChunk(chunk)
        • else if Chunk.startTime > seqFile.endTime
          • fileWriterAfter.writeChunk(chunk)
        • else
          • 为这个 Chunk 构建一个 ChunkReader
          • while ChunkReader.hasNext() && chunkReader.currentTimestamp < seqFile.startTime
            • chunkWriterBefore.write(chunkReader.next())
            • if chunkWriterBefore.size > flush_threshold
              • flush chunkWriterBefore to fileWriterBefore
          • while ChunkReader.hasNext() && chunkReader.currentTimestamp <= seqFile.endTime
            • chunkReader.next()
          • while chunkReader.hasNext():
            • chunkWriterAfter.write(chunkReader.next())
            • if chunkWriterAfter.size > flush_threshold
              • flush chunkWriterAfter to fileWriterAfter
        • if chunkWriterAfter.size > 0
          • flush chunkWriterAfter to fileWriterAfter
        • if chunkWriterBefore.size > 0
          • flush chunkWriterBefore to fileWriterBefore
    • fileWriterBefore.endChunkGroup(device), fileWriterAfter.endChunkGroup(device)
  • 将拆分后的两个文件分别命名为 {原文件时间戳 - 1}-{原文件版本号}-{0}-{1}.tsfile 和 {原文件时间戳 - 1}-{原文件版本号}-{0}-{1}.tsfile 

算法1

输入:待合并的 sensor

  • if 这个 sensor 存在于 deviceChunkMetadataMap 中
    •  从 deviceChunkMetadataMap 中获取该 sensor 在 seqFile 中的 sensorChunkMetadataList
    • for chunkMetadata in sensorChunkMetadataList
      • chunk = readMemChunk(chunkMetadata)
      • unclosedChunkPoint = 0L
      • tsFileResource.updateStartTime(chunkMetadata.startTime)
      • tsFileResource.updateEndTime(chunkMetadata.endTime)
      • 判断当前 chunk 是否被修改 modified
      • if isOverlap(chunkMetadata)
        • 将其与乱序数据重叠的部分合并后写入 ChunkWriter(见 算法2
        • unclosedChunkPoint+=写入的点数
      • else if isChunkTooSmall(chunkMetadata)
        • 将这个 Chunk 解压缩后写入 ChunkWriter
        • unclosedChunkPoint+=写入的点数
      • else
        • if  unclosedChunkPoint > 0 || modified
          • 将这个 Chunk 解压缩后写入 ChunkWriter
        • else 
          • 将这个 Chunk 不解压缩写入 writer
      • if unclosedChunkPoint > merge_chunk_point_num_threshold
        • 将 ChunkWriter 写入 writer
        • unclosedChunkPoint = 0
  • 否则
    • 通过算法3将乱序数据写入 seqFile 中

...