Table of Contents |
---|
设计原则
控制合并任务提交的入口数量减少合并任务提交的入口数量
能够在原始写入的文件大小不一的情况下,控制合并目标文件大小(如初始写出的 能够在原始写入的文件大小不一的情况下(如初始写出的 TsFile 大小为 100M,10k,20M)100M,10k,20M),控制合并目标文件大小
将 TsFileManagement 里管理 TsFileResource 的数据结构与合并机制分离,固定管理 的数据结构与 合并机制分离,固定管理 TsFileResource 的数据结构
能够做到时间分区内部合并任务的并行
...
顺序空间:顺序数据文件所在的空间
乱序空间:乱序数据文件所在的空间
顺序合并(Seq 顺序空间合并(Seq Compaction):在顺序空间内部进行文件的合并(SizeTired Compaction)
乱序合并(Unseq 乱序空间合并(Unseq Compaction):在乱序空间内部进行文件的合并(SizeTired Compaction)
顺乱序合并跨空间合并(Cross Compaction):将乱序空间的文件合并到顺序空间
...
合并恢复任务(重启时执行一次):将中断的合并执行任务继续执行
...
合并调度的参数
enable_seq_space_compaction=true
...
compaction_priority=inner_cross(优先执行空间内合并,快速减少文件数)、cross_inner(优先执行跨文件空间合并,消除乱序文件)、balance(平衡减少文件数和消除乱序文件)
compaction_concurrent_compaction_thread=50(合并执行任务的并行度)
compaction_interval=10000(定时合并任务的提交间隔,单位 10000(合并调度任务的执行间隔,单位 ms)
compaction_candidate_file_num=100 (候选合并文件个数)
compaction_target_tsfile_size=2000000000 (合并的目标文件大小)
TsFileManagement 文件管理器(每个虚拟存储组一个)
...
- 倒序遍历时间分区
- CompactionManager.compactionScheduledoCompactionSchedule(TsFileManagement management,long 分区号)
...
合并管理器 CompactionManager(与 MManager 同级,系统启动时注册)
static volatile boolean isRecovered=false; (是否恢复完成)
...
存储组为之前中断的每个合并执行任务,提交异步合并恢复任务
...
doCompactionSchedule(TsFileManagement management,long 分区号):执行合并调度任务
如果 isRecovered = true,执行 compactionSchedule();
...
合并调度任务 compactionSchedule
static long currentTaskNum=0;// 当前进行的合并任务个数
static compactionSchedule(TsFileManagement tsfileManagement, long 分区号):
- if: currentTaskNum >= compaction_concurrent_compaction_thread: return;
- else:
- tsfileManagement.readLock(resourceList)
- 顺序文件链表=tsfileManagement.getSeqList(分区号)
- 乱序文件链表=tsfileManagement.getUnSeqList(分区号)
- 如果 compaction_priority=balance
- doCompactionInBalancePriority();
- 如果 compaction_priority=inner_cross
- doCompactionInInnerSpacePriority();
- 如果 compaction_priority=cross_inner
- doCompactionInCrossSpacePriority();
- 如果 compaction_priority=balance
- tsfileManagement.readUnLock(resourceList);
...
- hasSelected = true
- while (hasSelected && currentTaskNum < compaction_concurrent_compaction_thread)
- hasSelected = selectInSpaceFilesselectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
- hasSelected |= selectInSpaceFilesselectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
- hasSelected |= crossSpaceCompaction();
...
doCompactionInInnerSpacePriority
- selectInSpaceFilesselectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
- selectInSpaceFilesselectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
- crossSpaceCompaction();
...
doCompactionInCrossSpacePriority
- crossSpaceCompaction();
- selectInSpaceFilesselectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
- selectInSpaceFilesselectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
selectInSpaceFilesselectAndSubmitInSpaceFiles(双向文件链表, isSeq): 返回是否选中文件做合并
- hasSelect = false
- 候选文件列表 Flist=[]
- 倒序遍历文件列表
- 如果 (任务已经足够多:currentTaskNum >= concurrent compaction_compactionconcurrent_thread)
|| (无需执行顺序合并: isSeq && !enable_seq_space_compaction)
|| (无需执行乱序合并: !isSeq && !enable_unseq_space_compaction)- return hasSelected
- 如果(当前文件大小 > target_tsfile_size || 当前文件正在合并 || 当前文件未关闭)
- 清空 FList;
- continue;
- 将当前文件加入 FList
- 如果 FList 内所有文件总大小大于 compaction_target_tsfile_size || FList 内文件数达到 compaction_candidate_file_num
- 将 FList 拷贝提交异步任务(空间内合并执行流程), FList=[],++currentTaskNum, hasSelected=true
- 如果 compaction_priority=balance
- break;
- 如果 (任务已经足够多:currentTaskNum >= concurrent compaction_compactionconcurrent_thread)
- return hasSelected
...
- hasSelected = false
- 如果 currentTaskNum >= concurrent compaction_compactionconcurrent_thread
|| !enable_cross_space_compaction- return hasSelected;
- 候选文件 F = []
- hasSelected = false
- for 顺序文件 s in 顺序文件链表 S
- if isOverlap(U, s) && memoryCost(F, s) < memory_budget
- 将 s 添加进 F 中
- hasSelected = true
- if isOverlap(U, s) && memoryCost(F, s) < memory_budget
- 将 F 提交异步任务(跨文件空间合并执行流程)
- ++currentTaskNum
- hasSelected = true
- 如果 compaction_priority=balance, break;
- return hasSelected
...