...
- 倒序遍历时间分区
- CompactionManager.doCompactionSchedule(TsFileManagement management,long 分区号)
合并管理器 CompactionManager(与 MManager 同级,系统启动时注册)
static volatile boolean isRecovered=false; (是否恢复完成)
recover():恢复
存储组为之前中断的每个合并执行任务,提交异步合并恢复任务
doCompactionSchedule(TsFileManagement management,long 分区号):执行合并调度任务
如果 isRecovered = true,执行 compactionSchedule();
合并调度任务 compactionSchedule
static long currentTaskNum=0;// 当前进行的合并任务个数
static compactionSchedule(TsFileManagement tsfileManagement, long 分区号):
- if: currentTaskNum >= compaction_concurrent_thread: return;
- else:
- tsfileManagement.readLock(resourceList)
- 顺序文件链表=tsfileManagement.getSeqList(分区号)
- 乱序文件链表=tsfileManagement.getUnSeqList(分区号)
- 如果 compaction_priority=balance
- doCompactionInBalancePriority();
- 如果 compaction_priority=inner_cross
- doCompactionInInnerSpacePriority();
- 如果 compaction_priority=cross_inner
- doCompactionInCrossSpacePriority();
- 如果 compaction_priority=balance
- tsfileManagement.readUnLock(resourceList);
doCompactionInBalancePriority
- hasSelected = true
- while (hasSelected && currentTaskNum < compaction_concurrent_thread)
- hasSelected = selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
- hasSelected |= selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
- hasSelected |= selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)
doCompactionInInnerSpacePriority
- selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
- selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
- selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)
doCompactionInCrossSpacePriority
- selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)
- selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
- selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
selectAndSubmitInSpaceFiles(双向文件链表, isSeq): 返回是否选中文件做合并
- hasSelect = false
- 候选文件列表 Flist=[]
- 倒序遍历文件列表
- 如果 (任务已经足够多:currentTaskNum >= compaction_concurrent_thread)
|| (无需执行顺序合并: isSeq && !enable_seq_space_compaction)
|| (无需执行乱序合并: !isSeq && !enable_unseq_space_compaction)- return hasSelected
- 如果(当前文件大小 > target_tsfile_size || 当前文件正在合并 || 当前文件未关闭)
- 清空 FList;
- continue;
- 将当前文件加入 FList
- 如果 FList 内所有文件总大小大于 compaction_target_tsfile_size || FList 内文件数达到 compaction_candidate_file_num
- 将 FList 拷贝提交异步任务(空间内合并执行流程), FList=[],++currentTaskNum, hasSelected=true
- 如果 compaction_priority=balance
- break;
- 如果 (任务已经足够多:currentTaskNum >= compaction_concurrent_thread)
- return hasSelected
选出来的顺序文件:已关闭、未正在合并、至少与一个选出的乱序文件重叠
选出来的乱序文件:已关闭、未正在合并、所有与此乱序文件重叠的顺序文件(至少一个)都被选中
selectAndSubmitCrossSpaceFiles(顺序文件链表 S, 乱序文件链表 U): 返回是否选中文件做合并
- hasSelected = false
- 如果 currentTaskNum >= compaction_concurrent_thread
|| !enable_cross_space_compaction- return hasSelected;
- 候选乱序文件 UF = []
- 候选顺序文件 SF = []
- hasSelected = false
- for 乱序文件 u in 乱序文件链表 U
- if (u 未关闭 || u 正在合并)
- if submitIfNotEmpty(SF, UF):
- 如果 compaction_priority=balance, break;
- if ( 线程池满 ) break;
- continue;
- if submitIfNotEmpty(SF, UF):
- 临时选中顺序文件 TmpSF = []
- continueToNextUnseqFile = false
- for 顺序文件 s in 顺序文件链表 S
- if isOverlap(u, s)
- if s 未关闭 || s 正在合并
- if submitIfNotEmpty(SF, UF):
- 如果 compaction_priority=balance, return hasSelected;
- continueToNextUnseqFile = true; break
- if submitIfNotEmpty(SF, UF):
- else
- TmpSF.add(s)
- if s 未关闭 || s 正在合并
- if isOverlap(u, s)
- if continueToNextUnseqFile
- continue
- SF.addAll(tmpSF), UF.add(u)
- if UF.size() >= compaction_cross_space_max_select_unseq_file_num
- if submitIfNotEmpty(SF, UF)
- if compaction_priority=balance, break
- if ( 线程池满 ) break;
- if submitIfNotEmpty(SF, UF)
- if (u 未关闭 || u 正在合并)
- submitIfNotEmpty(SF, UF)
- return hasSelected
合并执行任务
空间内合并执行流程
跨文件空间合并执行流程
合并恢复任务
顺序空间合并执行流程
乱序空间合并执行流程
跨文件空间合并执行流程
合并任务管理器CompactionTaskManager
...