Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

输入:多个文件的 ChunkMetadataListIterator(iterator简称iterator),Iterator 每次输出的 List 内的 sensor 个数为 max_index_degree(假设有3个文件:file1_Iterator 返回两批 sensor_list:(s1,s2),(s3)    file2, file2_iterator:(s1, s3)  file3_iterator:(s1,s2),(s3,s5)

输出:每轮待合并的 sensor 列表输出:List<sensor>

描述:每个 iterator 取1个 List,找到每个 List 的最大字典序的 sensor,组成集合 S,本次合并从头开始合并到 S 中最小字典序的 sensor_lex_min_lex_min,直到所有都合并完 。并从 List 中清除已合并的 sensor,如果此 List 消耗完,获取下一个 Lst

优势:每次至少会消耗完一个 file 的一个 List

...

  • getNextSensors(List<ChunkMetadataListIterator> iterators)


第一轮:file1(s1,s2)+file2(s1,s3)+file3(s1,s2) → S= S=(s2, s3),最小字典序 sensor_lex_min 是 s2 → 从 s2, s3),最小字典序 sensor_lex_min 是 s2 → 从 s1 合并到 合并到 s2s2,输出 (s1,s2)

第二轮:file1(s3)+file2(s3)+file3(s3,s5)  → S=S=(s3, s5, s5),最小字典序 sensor_lex_min 是 s3 →  从 s3 合到 s3,最小字典序 sensor_lex_min 是 s3 →  从 s3 合到 s3s3, 输出(s3)

第三轮:file3(s5)  → S=S=(s5),最小字典序 sensor_lex_min 是 s5 → 从 s5 合到 s5,最小字典序 sensor_lex_min 是 s5 → 从 s5 合到 s5s5,输出(s5)




反序列化Page合并算法

  • 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
  • 遍历上述的time-value列表,将数据写入新的ChunkWriter
  • 判断限流
  • ChunkWriter写入新文件

...

  • 获取 SeqTsFileList和UnseqTsFileList 中的所有设备 deviceSet中的所有设备和传感器 deviceSensorsMap(Map<Device, List<Sensor>>)
  • 新建一个 Map<TsFileResource, RestorableFileWriter> newWriterCache;
  • 新建一个 Map<TsFileResource, TsFileResource> newTsFileResourceCache;
  • for device in deviceSet, sensors in deviceSensorsMap
    • 为 sensors 构建一个 bitMap,记录是否被合并,默认全部为 false
    • for seqFile in  SeqTsFileList
      • 从 newTsFileResourceCache 中获取,若不存在则新建一个 TsFileResource writer 
      • 从 newWriterCache 中获取,若不存在则新建一个
      • 新建一个临时 seqFile, 并建立 RestorableFileWriter writer 
      • writer.startChunkGroup(device)
      • 对于 seqFile 新建 ChunkMetadataListIterator
      • ChunkMetadataListIterator 中迭代获取该设备的 sensorList 及其对应的 List<ChunkMetadata> sensorChunkMetadataList(每次返回 max_degree_of_index_node 个 sensor),对于每一个迭代,并获取 ModificationList,对于每一个迭代
        • for sensor, sensorChunkMetadataList:
          • 如果 sensorChunkMetadataList 不为空
            • 根据 算法1 将该 sensor 的 Chunk 与对应的 unseqReader 中的数据进行合并
            • 将该 sensor 在 bitMap 上的位置为 true
      • 如果当前的 seqFile 是最后一个 seqFile:
        • 将该 device 下在 bitMap 中记录未合并的 sensor 对应的 unseqReader 的数据写入这个 seqFile 的临时文件中
        • for unseqReader in unseqReaderList
          • if unseqReader 未读完
            • 新建一个 ChunkWriter 
            • 根据 算法3 将未读完的 unseqReader 的剩余数据写入 ChunkWriter 中
            • 将 ChunkWriter 写入到writer
      • writer.endChunkGroup()
  • for tsFileResource in newTsFileResourceCache
    • tsFileResource.serialize()
    • tsFileResource.close()
  • for writer in newWriterCache
    • writer.endFile()

算法1

输入:sensor, sensorChunkMetadataList, unseqReader, tsFileResource, modification

  • for chunkMetadata in sensorChunkMetadataList
    • chunk = readMemChunk(chunkMetadata)
    • unclosedChunkPoint = 0L
    • tsFileResource.updateStartTime(chunkMetadata.startTime)
    • tsFileResource.updateEndTime(chunkMetadata.endTime)
    • 判断当前 chunk 是否被修改 modified
    • if isOverlap(chunkMetadata)
      • 将其与乱序数据重叠的部分合并后写入 ChunkWriter(见 算法2
      • unclosedChunkPoint+=写入的点数
    • else if isChunkTooSmall(chunkMetadata)
      • 将这个 Chunk 解压缩后写入 ChunkWriter
      • unclosedChunkPoint+=写入的点数
    • else
      • if  unclosedChunkPoint > 0 || modifiedif  unclosedChunkPoint>0
        • 将这个 Chunk 解压缩后写入 ChunkWriter
      • else 
        • 将这个 Chunk 不解压缩写入 writer
    • if unclosedChunkPoint > merge_chunk_point_num_threshold
      • 将 ChunkWriter 写入 writer
      • unclosedChunkPoint = 0

算法2

输入:chunk, unseqReader, deviceEndTime, tsFileResource, modification

  • 对 Chunk 构建一个 ChunkReader
  • 使用 ChunkReader 获取 Chunk 中的每一个 Page
    • while pageData.hasNext() 
      • 获取 pageData 当前的时间戳 seqTime
      • overwriteSeqPoint = Falsefalse
      • while 该 sensor 还有乱序数据并且乱序数据的下一个时间戳小于 seqTime
        • 将乱序数据写入 ChunkWrtier 中
        • tsFileResource.updateStartTime(乱序数据)
        • tsFileResource.updateEndTime(乱序数据)
        • 如果乱序数据的时间戳 == seqTime
          • overwriteSeqPoint = Truetrue
      • 根据 算法3 将该 unseqReader 小于等于 deviceEndTime 的剩余数据写入 ChunkWriter 中
      • if !overwriteSeqPoint && !pageData 当前时间戳在 modification 中被删除
        • 将 pageData 当前的数据点写入 ChunkWriter 中
        • tsFileResource.updateStartTime(乱序数据)
        • tsFileResource.updateEndTime(乱序数据)

算法3

输入:chunkWriter, unseqReader, timeLimit, tsFileResource

  • while unseqReader 还有数据 && 数据时间戳<=timeLimit
    • 将 unseqReader 当前数据写入 chunkWriter
    • tsFileResource.updateStartTime(乱序数据)
    • tsFileResource.updateEndTime(乱序数据)