空间内合并 SizeTieredCompactionTask

  1. 为每个待合并源文件加读锁并set IsCompacting为true
  2. 创建临时的目标文件,为 .inner 文件
  3. 创建合并日志,为 .compaction.log 文件
    1. 写入待合并源文件路径
    2. 写入“sequence”或者“unsequence”,标记是顺序 or 乱序空间内合并
    3. 写入目标文件路径
  4. 执行具体的空间内合并流程:
    1. 创建目标文件 RestorableTsFileIOWriter 对象(会开启一个新 TsFile,往输出缓存写入 magic string 和 version)

    2. 获取所有待合并源文件的所有设备ID,并遍历每个设备ID(开始合并每个设备里的数据):

      1. 往目标文件里写入该设备的 ChunkGroupHeader

      2. //下方红色文字为“算法 1”,算法 1 见下方

      3. 遍历每个待合并文件中该设备在泽嵩树上的所有 TimeseriesMetadata 节点:

        • 在 chunkMetadataListCacheForMerge 里(其格式为:TsFileSequenceReader→ measurementId → chunkMetadataList,我们可以看成每个待合并文件有一个桶,该桶存放该待合并文件的该设备下的一批传感器的 ChunkMetadataList),若某待合并文件的桶为空,则获取该待合并文件该设备的下个 TimeseriesMetadata 节点上的所有传感器以及各自对应的 ChunkMetadataList放入该文件的桶里。(注意:泽嵩树上TimeseriesMetadata 节点上,是按照 measurementID 的字典升序排列 TimeseriesIndex 的,因此获得的传感器 measurementId 是按照字典序升序排列的

        • 初始化临时 Sensor 列表 candidateSensors,把所有文件的桶里的所有传感器 ID 放入 candidateSensors 里找出每个待合并文件的桶里字典序最大的 measurementId 放入集合 S 中并令 lastSensor 为集合 S 中最小的 measurementId

        • 循环遍历 candidateSensors 中字典序不大于 lastSensor 的(即算法 1 的输出) 
          • 获取包含当前 sensor 的待合并文件的 TsFileSequenceReader 和该 sensor 在该待合并文件里的 ChunkMetadataList 放入 readerChunkMetadataListMap 变量里(其格式为:TsFileSequenceReader → chunkMetadataList。注意:此时该变量里存在该 sensor 的每个待合并文件的 version 是从旧到新的,即 timestamp 一定是递增的)。遍历该变量的每个元素,即存在该 sensor 的 TsFileSequenceReader 和对应的 ChunkMetadataList:
            • 若是乱序空间内合并以数据点为单位进行合并重写:把多个 oldChunk 里符合条件的数据点写到目标文件 pageWriter 里,然后 flush 到目标文件 writer 的输出缓存里。(具体:获取每个包含该sensor的待合并文件的删除操作,根据每个文件对该序列的删除操作获取该 sensor 所有符合条件的数据点,然后创建目标文件对应该 sensor 的 chunkWriterImpl,依次把所有数据点写入,最后刷到目标文件 writer 里的缓存里)
            • 若是顺序空间内合并判断包含该 sensor 的每个待合并文件里的该 sensor 的每个 Chunk 里数据点数量是否超过系统预设的 Chunk 或者 page 数据点数量,分别用 isChunkEnoughLarge 和 isPageEnoughLarge 标记。此处查询每个包含该 sensor 的待合并 TsFile 对该 sensor 序列是否有删除操作,若有则把前面两个参数置为 false,因为可能删掉数据后数据点数量就没有那么多了,并且就只能按数据点进行合并重写到目标文件,因为要过滤掉被删除的数据点
              • 若 isChunkEnoughLarg 为 true以 chunk 为单位进行合并:把多个 oldChunk 按序(按时间戳递增)直接追加写入到目标文件 writer 的输出缓存流里。(具体:当该 sensor 传感器在原先待合并的所有 TsFile 里的所有 Chunk 的数据点数量都大于系统预设 Chunk 数据点数量,则直接往目标文件里依次追加写入该 sensor 的这些 Chunk,写入的同时用限制器限流,并更新目标文件该设备的开始和结束时间)
              • 若 isPageEnoughLarge 为 true以 page 为单位进行合并:把多个 oldChunk 的 chunkData 以 page 为单位追加写到一个 newChunk 的 chunkData 里。极端情况是有 n 个 oldChunk,每个 oldChunk 都只有一个page,则 newChunk 就只有 n 个 page。(具体:当该 sensor 在原先待合并的所有 TsFile 里存在一个以上 Chunk 的数据点数量小于系统预设 Chunk 数据点数量 但是所有 Chunk 的数据点数量都大于系统预设 page 数据点数量,则把该 sensor 在不同待合并文件里的所有 Chunk 和 ChunkMetadata 合并到第一个 Chunk 和 ChunkMetadata 里,并把合并后的新 Chunk 写入到目标文件 writer 的输出缓存里)
              • 否则以数据点为单位进行合并重写(同乱序空间内合并)
      4. 结束此设备的 ChunkGroup,将目标文件 writer 输出缓存的内容 flush 到本地文件里

    3. 往目标文件 writer 的输出缓存里写对应索引区的内容以及文件结尾的 Magic String 等,最后 flush 到本地并关闭该 writer 的输出流。

  5. 将临时目标文件 .inner 文件重命名为 .tsfile 文件,并序列化目标文件的 resource 文件,即 .tsfile.resource 文件
  6. 合并过程中对所有待合并源文件产生的 .compaction.mods 文件里的删除操作合并到目标文件的 .mods 文件里
  7. 更新相关内存:
    1. 给TsFileManager加写锁 
    2. 更新 TsFileResourceManager,移除源文件的 TsFileResource 并增加目标文件的 TsFileResource
    3. 更新 TsFileResourceList,移除 TsFileResourceList 里源文件的 TsFileResource,并在相同位置增加目标文件的 TsFileResource
    4. 给TsFileManager释放写锁
  8. 给每个源文件释放读锁,并加写锁
  9. 删除本地源文件和对应的.resource文件
  10. 删除源文件对应的本地.mods文件和.compaction.mods文件
  11. 删除合并日志
  12. 给每个源文件释放写锁,并set isCompacting为false,isCompactionCandidate为false


算法1:

输入:多个文件的 ChunkMetadataListIterator(简称iterator),Iterator 每次输出的 List 内的 sensor 个数为 max_index_degree(假设有3个文件:file1_Iterator 返回两批 sensor_list:(s1,s2),(s3) , file2_iterator:(s1, s3)  file3_iterator:(s1,s2),(s3,s5))

输出:List<sensor>

描述:每个 iterator 取1个 List,找到每个 List 的最大字典序的 sensor,组成集合 S,本次合并从头开始合并到 S 中最小字典序的 sensor_lex_min,直到所有都合并完 。并从 List 中清除已合并的 sensor,如果此 List 消耗完,获取下一个 Lst

优势:每次至少会消耗完一个 file 的一个 List


第一轮:file1(s1,s2)+file2(s1,s3)+file3(s1,s2) → S=(s2, s3, s2),最小字典序 sensor_lex_min 是 s2,输出 (s1,s2)

第二轮:file1(s3)+file2(s3)+file3(s3,s5)  → S=(s3, s3, s5),最小字典序 sensor_lex_min 是 s3, 输出(s3)

第三轮:file3(s5) → S=(s5),最小字典序 sensor_lex_min 是 s5,输出(s5)


跨空间合并 RewriteCrossSpaceCompactionTask

  1. 为每个待合并源文件加读锁并set IsCompacting为true
  2. 创建临时的目标文件列表,为 .cross 文件
  3. 创建合并日志,为 .compaction.log 文件
  4. 调用CompactionUtils工具的compact接口,执行具体的跨空间合并。
  5. 将临时目标文件 .cross文件重命名为 .tsfile 文件,并序列化目标文件的 resource 文件,即 .tsfile.resource 文件
  6. 合并过程中对所有待合并源文件产生的 .compaction.mods 文件里的删除操作合并到目标文件的 .mods 文件里
  7. 更新相关内存:
    1. 给TsFileManager加写锁
    2. 更新 TsFileResourceManager,移除源文件的 TsFileResource 并增加目标文件的 TsFileResource
    3. 更新TsFileResourceList,移除 TsFileResourceList 里源文件的 TsFileResource,并在相同位置增加目标文件的 TsFileResource
    4. 给TsFileManager释放写锁
  8. 给每个源文件释放读锁,并加写锁
  9. 删除本地源文件和对应的.resource文件
  10. 删除源文件对应的本地.mods文件和.compaction.mods文件
  11. 删除合并日志
  12. 给每个源文件释放写锁,并set isCompacting为false,isCompactionCandidate为false


CompactionUtils工具

该工具可用于空间内和跨空间的合并,而目前对于顺序空间内合并我们使用的是InnerSpaceCompactionUtils以加速合并。

Compact接口

流程:
    1. 注册queryId,其中queryID=Long.Min+合并线程号
    2. 根据合并类型构建对应的CompactionWriter(CrossSpaceCompactionWriter或InnerSpaceCompactionWriter)
    3. 使用迭代器 迭代地获取所有源文件里的deviceid,对每个设备获取其所有的measurements:
    4. 对每个序列构建SeriesRawBatchReader,依次读出整理(排序且去重)后的每个数据点并写入目标文件中
注意:
    1. 跨空间合并中,写入目标文件的每个设备的数据点的时间戳不大于对应源文件的deviceMaxEndTime.
    2. 若乱序文件里有新数据(即乱序文件里出现顺序文件里没有的device,后者乱序序列的时间大于顺序文件里的最大时间的数据点),则往最后一个目标文件里写
    3. 若合并后,某个源文件对应的mods把其数据都删掉了,则该源文件的targetFileResource会从接口的targetResource列表中移


合并的恢复

  1. 读取合并日志里的内容,若合并日志不完整(即待合并源文件或着目标文件为空),则删除日志文件并返回。
  2. 若所有的待合并源文件都存在,则删除相关文件:包括临时目标文件、.tsfile 目标文件及其.tsfile.resource 文件和.mods 文件,还有源文件的.compaction.mods文件。最后删除日志文件。
  3. 若非所有的待合并源文件都存在(即部分源文件被删除,说明 .tsfile 目标文件和  .tsfile.resource  文件一定存在且  .target  临时目标文件一定不存在,并且目标文件的 .mods 文件已生成),则删除剩余的源文件以及每个源文件对应的 .mods 和 .compaction.mods 文件,最后删除日志文件。

      注意:若以上有任何一步执行出错或者抛异常,则 tsFileManager.setAllowCompaction(false) 禁止系统执行后续的其余合并任务。


合并的异常处理

  1. 若合并日志不存在,说明尚未开始执行合并,则什么都不做。
  2. 若所有的待合并源文件都存在,则
    1. 删除相关文件:包括临时目标文件、.tsfile 目标文件及其.tsfile.resource 文件和.mods 文件,还有源文件的.compaction.mods文件。注意:删除.tsfile目标文件的前后需要加写锁释放写锁,而删除临时目标文件则不用。
    2. 恢复相关内存:
      1. 给TsFileManager加写锁
      2. 恢复TsFileResourceList:移除目标文件的tsFileResource,并在相应位置插入所有源文件的tsFileResource
      3. 恢复TsFileResourceManager:移除目标文件的tsFileResource,并增加所有源文件的tsFileResource
      4. 给TsFileManager释放写锁
  3. 若非所有的待合并源文件都存在,即部分源文件被删除,此时会占用TsFileManager和各源文件的写锁。
    1. 若目标文件不存在,说明数据丢失,则设置系统为只读模式,即 setReadOnly(true)
    2. 若目标文件存在
      1. 若目标文件是完整的,则删除剩余的源文件以及每个源文件对应的 .mods 和 .compaction.mods 文件。
      2. 若目标文件是不完整的,说明丢失数据,则设置系统为只读模式,即 setReadOnly(true)
  4. 若以上操作没有任何一步抛异常并且都执行成功,则删除合并日志;否则, tsFileManager.setAllowCompaction(false) 禁止系统执行后续的其余合并任务。


合并的加锁流程

  1. 选择待合并的源文件,并设置setCompactionCandidate(true),将它们封装入一个合并任务里,并放入等待队列中。
  2. 从等待队列里拿出来,依次对每个源文件加读锁,并检查该源文件是否Valid,若isValidsetCompacting(true),否则释放所有源文件的读锁并setCompacting(false)
  3. 执行合并,并将临时目标文件移动成最终目标.tsfile文件,生成.resource文件,合并compactionMods文件,更新内存:
    1. 给TsFileManager加写锁
    2. 更新 TsFileResourceManager,使用Sychronized移除源文件的 TsFileResource 并增加目标文件的 TsFileResource
    3. 更新TsFileManagerTsFileResourceList,移除源文件的tsfileResource,插入目标文件的tsFileResource
    4. 给TsFileManager释放写锁
  4. 依次对每个源文件释放读锁加写锁,删除源文件和日志。
  5. 对每个源文件释放写锁setCompacting(false)setCompactionCandidate(false).
  • No labels

1 Comment

  1. PRESIDENT OF THE WORLDcharmx48529361489527369814nil.define.