Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


目标场景

假设写入速度是稳定的

场景1:海量小文件,离线合并

场景2:合并速度小于写入速度:10个存储组,每个存储组每天会产生10个文件,每个线程每天能合5个文件,系统最多有10个线程

场景3:合并速度大于写入速度


名词解释

顺序空间:顺序数据文件所在的空间

乱序空间:乱序数据文件所在的空间


顺序层级合并(Level Compaction):在顺序空间内部进行文件的层级合并(SizeTired 顺序空间合并(Seq Compaction):在顺序空间内部进行文件的合并(SizeTired Compaction)

乱序层级合并(Level Compaction):在乱序空间内部进行文件的层级合并(SizeTired 乱序空间合并(Unseq Compaction):在乱序空间内部进行文件的合并(SizeTired Compaction)

跨文件空间合并(Merge):将乱序空间的乱序文件合并到顺序空间中:将乱序空间的文件合并到顺序空间目标场景


合并的总体任务

假设写入速度是稳定的合并调度任务

场景1:海量小文件,离线合并

场景2:10个存储组,每个存储组每天会产生10个文件,每个线程每天能合5个文件,系统最多有10个线程

合并任务(空间内和跨空间)的优先级

iotdb-engine.properties 配置参数:compaction_priority=inner_cross

目标:快速减少文件个数

compaction_priority=inner_cross

目标:消除乱序文件

compaction_priority=cross_inner

目标:平衡减少文件数和消除乱序文件

compaction_priority=balance

合并任务的并行度

iotdb-engine.properties 配置参数:concurrent_compaction_thread=50

合并执行任务

合并恢复任务


全局合并管理器(与 MManager 同级,系统启动时注册)

static volatile boolean isRecover=false;

init()

...

:初始化并恢复

提交异步合并恢复任务

注册定时任务:间隔 注册定时任务:每delay compaction_interval 调度 启动一次 doCompaction()


doCompaction():执行合并

如果 isRecover = true && enable_continuous_compaction = true

提交异步合并调度任务

合并调度任务

执行 compactionSchedule();



合并调度任务


iotdb-engine.properties 配置项

  • compaction_priority=inner_cross(优先执行空间内合并,快速减少文件数)、cross_inner(优先执行跨文件空间合并,消除乱序文件)、balance(平衡减少文件数和消除乱序文件)
  • concurrent_compaction_thread=50  (合并执行任务的并行度)


static  long  currentTaskNum=0;// 当前进行的合并任务个数


调度任务主体工作:

runcompactionSchedule():

如果 currentTaskNum == concurrent_compaction_thread return

  • for storage groups
  • 加 resourceList 读锁
    • for partitions (倒序遍历)
      • 如果 currentTaskNum < concurrent_compaction_thread
        • 如果 compaction_priority=inner_cross || balance
          • innerSpaceCompaction();  crossSpaceCompaction();
        • 如果 compaction_priority=cross_inner
          • crossSpaceCompaction();  innerSpaceCompaction(); 
  • 释放 resourceList 读锁


innerSpaceCompaction()

  • 候选文件 F=[]
  • 倒序遍历顺序文件列表
    • 如果(当前文件大小大于 target_tsfile_size || 当前文件正在合并 || 当前文件未关闭),则清空 F,continue;
    • 将当前文件加入 F
    • 如果 F,如果 F 文件大小大于 target_tsfile_size,将 F 提交异步任务(顺序文件合并执行流程), F=[]
    • 如果 compaction_priority=balance, break;
  •  倒序遍历乱序文件列表
    • 如果(当前文件大小大于 target_tsfile_size || 当前文件正在合并 || 当前文件未关闭),则清空 F,continue;
    • 将当前文件加入 F
    • 如果 F,如果 F 文件大小大于 target_tsfile_size,将 F 提交异步任务(乱序文件合并执行流程), F=[]
    • 如果 compaction_priority=balance, break;选择跨文件空间合并候选文件并提交任务


crossSpaceCompaction()

  • 选择跨文件空间合并候选文件 F
  • 将 F 提交异步任务(跨文件空间合并执行流程)
  • 如果 compaction_priority=balance, break;