目标场景
假设写入速度是稳定的
场景1:海量小文件,离线合并
场景2:合并速度小于写入速度:10个存储组,每个存储组每天会产生10个文件,每个线程每天能合5个文件,系统最多有10个线程
场景3:合并速度大于写入速度
名词解释
顺序空间:顺序数据文件所在的空间
乱序空间:乱序数据文件所在的空间
顺序空间合并(Seq Compaction):在顺序空间内部进行文件的合并(SizeTired Compaction)
乱序空间合并(Unseq Compaction):在乱序空间内部进行文件的合并(SizeTired Compaction)
跨文件空间合并(Merge):将乱序空间的文件合并到顺序空间
合并的总体任务
合并调度任务
合并执行任务
合并恢复任务
全局合并管理器(与 MManager 同级,系统启动时注册)
static volatile boolean isRecover=false;
init():初始化并恢复
提交异步合并恢复任务
注册定时任务:每delay compaction_interval 启动一次 doCompaction()
doCompaction():执行合并
如果 isRecover = true && enable_continuous_compaction = true
执行 compactionSchedule();
合并调度任务
iotdb-engine.properties 配置项
- compaction_priority=inner_cross(优先执行空间内合并,快速减少文件数)、cross_inner(优先执行跨文件空间合并,消除乱序文件)、balance(平衡减少文件数和消除乱序文件)
- concurrent_compaction_thread=50 (合并执行任务的并行度)
static long currentTaskNum=0;// 当前进行的合并任务个数
调度任务主体工作:
compactionSchedule():
如果 currentTaskNum == concurrent_compaction_thread return
- for storage groups
- 加 resourceList 读锁
- for partitions (倒序遍历)
- 如果 currentTaskNum < concurrent_compaction_thread
- 如果 compaction_priority=inner_cross || balance
- innerSpaceCompaction(); crossSpaceCompaction();
- 如果 compaction_priority=cross_inner
- crossSpaceCompaction(); innerSpaceCompaction();
- 如果 compaction_priority=inner_cross || balance
- 如果 currentTaskNum < concurrent_compaction_thread
- for partitions (倒序遍历)
- 释放 resourceList 读锁
innerSpaceCompaction()
- 候选文件 F=[]
- 倒序遍历顺序文件列表
- 如果(当前文件大小大于 target_tsfile_size || 当前文件正在合并 || 当前文件未关闭),则清空 F,continue;
- 将当前文件加入 F
- 如果 F 文件大小大于 target_tsfile_size,将 F 提交异步任务(顺序文件合并执行流程), F=[];
- 如果 compaction_priority=balance, break;
- 倒序遍历乱序文件列表
- 如果(当前文件大小大于 target_tsfile_size || 当前文件正在合并 || 当前文件未关闭),则清空 F,continue;
- 将当前文件加入 F
- 如果 F 文件大小大于 target_tsfile_size,将 F 提交异步任务(乱序文件合并执行流程), F=[]
- 如果 compaction_priority=balance, break;
crossSpaceCompaction()
- 选择跨文件空间合并候选文件 F
- 将 F 提交异步任务(跨文件空间合并执行流程)
- 如果 compaction_priority=balance, break;