Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • 将顺序文件列表按照时间顺序从新到老排序得到 seqFileListOrderByTime
  • 将乱序文件列表按照版本从老到新进行排序得到 unseqFileListOrderByVersion
  • selectedFiles=Map<TsFileResource, List<TsFileResource>>, selectedUnseqFiles=Set<TsFileResource>
  • hasSelect = false
  • 遍历顺序文件列表 for seqFile in seqFileListOrderByTime
    • 如果 seqFile 正在合并 || seqFile 未关闭
      • 如果 selectedFiles 非空
        • 提交一个合并任务,清空 selectedFiles 和 selectedUnseqFiles, hasSelect = true, ++currentTaskNum
        • if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
          • return hasSelect
      • continue
    • 遍历乱序文件列表 for unseqFile in unseqFileListOrderByVersion
      • if unseqFile 和 seqFile 有数据重叠数据重叠
        • if unseqFile 未关闭 || unseqFile 正在合并
          • 如果 selectedFiles 非空
            • 提交一个合并任务,清空 selectedFiles, hasSelect = true, ++currentTaskNum
            • if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
              • return hasSelect
          • break
        • selectedFiles[seqFile].add(unseqFile)
        • selectedUnseqFiles.add(unseqFile)
        • if selectedUnseqif selectedFiles[seqFile].size() > compaction_cross_space_max_select_unseq_file_num
          • 提交一个合并任务,清空 selectedFiles, hasSelect = true, ++currentTaskNum
          • if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
            • return hasSelect
          • break
    • if selectedFiles.size() > compaction_cross_space_max_select_seq_file_num
      • 提交一个合并任务,清空 selectedFiles, hasSelect = true, ++currentTaskNum
      • if currentTaskNum >= MAX_COMPACTION_THREAD_NUM || compactionStrategy = BALANCE
        • return hasSelect


判断顺序文件与乱序文件是否发生重叠

输入:seqFile、unseqFile

  • return (seqFile.startTime >= unseqFile.startTime && seqFile.startTime <= unseqFile.endTime) || (seqFile.endTime >= unseqFile.startTime && seqFile.endTime <= unseqFile.endTime)

执行过程

将这些文件进行跨文件空间的合并

正序遍历提交的顺序文件,对于每一个顺序文件

...

  • if 这个 sensor 存在于 deviceChunkMetadataMap 中
    •  从 deviceChunkMetadataMap 中获取该 sensor 在 seqFile 中的 sensorChunkMetadataList
    • for chunkMetadata in sensorChunkMetadataList
      • chunk = readMemChunk(chunkMetadata)
      • unclosedChunkPoint = 0L
      • tsFileResource.updateStartTime(chunkMetadata.startTime)
      • tsFileResource.updateEndTime(chunkMetadata.endTime)
      • 判断当前 chunk 是否被修改 modified
      • if isOverlap(chunkMetadata)
        • 将其与乱序数据重叠的部分合并后写入 ChunkWriter(见 算法2
        • unclosedChunkPoint+=写入的点数
      • else if isChunkTooSmall(chunkMetadata)
        • 将这个 Chunk 解压缩后写入 ChunkWriter
        • unclosedChunkPoint+=写入的点数
      • else
        • if  unclosedChunkPoint > 0 || modified
          • 将这个 Chunk 解压缩后写入 ChunkWriter
        • else 
          • 将这个 Chunk 不解压缩写入 writer
      • if unclosedChunkPoint > merge_chunk_point_num_threshold
        • 将 ChunkWriter 写入 writer
        • unclosedChunkPoint = 0
  • 否则
    • 通过算法3将乱序数据写入 seqFile 中


算法2

输入:chunk, unseqReader, deviceEndTime, tsFileResource, modification

  • 对 Chunk 构建一个 ChunkReader
  • 使用 ChunkReader 获取 Chunk 中的每一个 Page
    • while pageData.hasNext() 
      • 获取 pageData 当前的时间戳 seqTime
      • overwriteSeqPoint = false
      • while 该 sensor 还有乱序数据并且乱序数据的下一个时间戳小于 seqTime
        • 将乱序数据写入 ChunkWrtier 中
        • tsFileResource.updateStartTime(乱序数据)
        • tsFileResource.updateEndTime(乱序数据)
        • 如果乱序数据的时间戳 == seqTime
          • overwriteSeqPoint = true

...

      • if !overwriteSeqPoint && !pageData 当前时间戳在 modification 中被删除
        • 将 pageData 当前的数据点写入 ChunkWriter 中
        • tsFileResource.updateStartTime(乱序数据)
        • tsFileResource.updateEndTime(乱序数据)


算法3

输入:unseqReader、restorableWriter、seqFile

  • 创建一个 ChunkWriter
  • while unseqReader.currentTimestamp < seqFile.startTime && unseqReader.hasNext()
    • unseqReader.next()
  • while unseqReader.currentTimestamp >= seqFile.startTime && unseqReader.currentTimestamp <= seqFile.endTimeendTime && unseqReader.hasNext()
    • 将 unseqReader 当前的时间点写入 ChunkWriter 中
    • unseqReader.next()
    • 如果 ChunkWriter 中的数据点的个数 > CHUNK_MIN_POINT_NUM
      • flush ChunkWriter to restorableWriter
  • 如果 ChunkWriter 中还有没刷盘的数据
    • flush chunkWriter to restorableWriter

...