Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

compaction_priority=balance

合并任务的并行度


iotdb-enineengine.properties 配置参数:concurrent_compaction_thread=50

...

全局合并管理器(与 MManager 同级,系统启动时注册)

static volatile boolean isRecover=false;


init() 恢复

提交异步合并恢复任务

注册定时任务:间隔 compaction_interval 调度 doCompaction()


doCompaction()

如果 isRecover = true && enable_continuous_compaction = true

提交异步合并调度任务



合并调度任务

static  long  currentTaskNum=0;// 当前进行的合并任务个数

...

如果 currentTaskNum == concurrent_compaction_thread return

  • for

...

  • storage groups
  • 加 resourceList 读锁
    • for partitions (

...

    • 倒序遍历)

      ...

          • 如果 currentTaskNum < concurrent_compaction_thread

      ...

            • 候选文件 F=[]
            • 倒序遍历顺序文件列表
              • 如果(当前文件大小大于 target_tsfile_size || 当前文件正在合并 || 当前文件未关闭),则清空 F,continue;
              • 将当前文件加入 F,如果 F 文件大小大于 target_tsfile_size,将 F 提交异步任务(顺序文件合并执行流程), F=[]
              • 如果 compaction_priority=balance, break;
            •  倒序遍历乱序文件列表
              • 如果(当前文件大小大于 target_tsfile_size || 当前文件正在合并 || 当前文件未关闭),则清空 F,continue;
              • 将当前文件加入 F,如果 F 文件大小大于 target_tsfile_size,将 F 提交异步任务(乱序文件合并执行流程), F=[]
              • 如果 compaction_priority=balance, break;
            • 选择跨文件空间合并候选文件并提交任务

                如果 currentTaskNum = concurrent_compaction_thread ,return

                如果 currentTaskNum < concurrent_compaction_thread ,通过空间内选择算法在乱序文件列表中找到找到目标文件大小个 TsFile, 提交乱序文件合并执行Task,如果 compaction_priority=balance:break;

                如果 currentTaskNum = concurrent_compaction_thread ,return

              find 跨文件空间(顺序遍历乱序文件)

                    如果 currentTaskNum < concurrent_compaction_thread ,通过跨空间选择算法找到目标文件大小个 TsFile, 提交跨文件空间合并执行Task,如果 compaction_priority=balance:break;

                    如果 currentTaskNum = concurrent_compaction_thread ,return

      空间内选择算法

      整体流程

      合并任务的触发时机

      系统启动时,每个 StorageGroupProsessor 的 recover 过程中

      • (同步)恢复当前虚拟存储组
      • 跨文件空间合并
      • (异步)恢复层级合并,当前只会有一个待恢复的任务,顺序或乱序
      • 如果开启了连续合并(enable_continuous_compaction=true),则对每个分区提交异步合并任务
        • 如果 COMPACTION_STRATEGY=NO_COMPACTION,则什么合并都不做,直接返回
        • 如果 COMPACTION_STRATEGY=LEVEL_COMPACTION,执行系统整体合并
          • 如果开启乱序合并(enable_unseq_compaction=true),且乱序空间最高层存在乱序文件,则执行跨文件空间合并
          • 对此分区的顺序空间从Level-0开始逐层向上寻找可合并的层级,合并一层后返回
          • 对此分区的乱序空间从Level-0开始逐层向上寻找可合并的层级,合并一层后返回

      文件关闭时

      合并任务提交

      • 如果 compactionMergeWorking 变量为true,则说明上一次合并还在进行,跳过本次合并
      • 如果 compactionMergeWorking 变量为false,说明本存储组没有合并任务在进行
        • 设置 compactionMergeWorking 为true
        • 拷贝当前文件列表镜像,见 算法1
        • 设置如果进行乱序合并,是否进行 FullMerge 
        • 提交合并任务

      算法1

      • 加读锁
      • 按层拷贝当前时间分区除了最高层之外所有已经关闭的顺序文件
      • 按层拷贝当前时间分区所有已经关闭的乱序文件
      • 解读锁

      FullMerge(完全合并)

      出于IO和合并性能考虑,乱序合并不会重写整个顺序文件,获取一个整理完全的顺序文件,而是会将与乱序文件重叠的 chunk 合并重写后,追加到原顺序文件后面,不删除原chunk,称这个过程为 原地合并

      但是过多的 原地合并 会造成无用的数据越来越多,甚至在某些情况下导致一个文件大小无限制地增大(见线上问题分析)

      完全合并 则是在乱序合并时重写整个顺序文件,IO和合并性能较低,但能获取一个整理完全的顺序文件

      系统整体合并

      • 如果开启乱序合并(enable_unseq_compaction=true),且乱序空间最高层存在乱序文件,则执行跨文件空间合并
      • 选择文件并进行顺序空间的层级合并
        • 如果有跨文件空间合并正在进行,等待该任务完成
        • 对除了最高层外的文件
          • 如果本层文件个数>seq_file_num_in_each_level,则进行将本层文件提交合并,详细合并流程见 合并机制执行合并流程文档-层级合并
      • 选择文件并进行乱序空间的层级合并或跨文件空间合并
        • 如果 unseq_level_num==1,提交当前时间分区下所有乱序文件和顺序文件进行合并
        • 对除了最高层外的文件
          • 如果本层文件个数>unseq_file_num_in_each_level
            • 如果开启了跨文件空间的合并,且unseq_level_num-2==当前层index
              • 提交当前时间分区最高层的乱序文件和所有顺序文件进行跨文件空间合并,详细合并流程见 合并机制执行合并流程文档-跨文件空间合并
            • 否则进行将本层文件提交层级合并,详细合并流程见 合并机制执行合并流程文档-层级合并