Table of Contents |
---|
设计原则
减少合并任务提交的入口数量
能够在原始写入的文件大小不一的情况下(如初始写出的 TsFile 大小为 100M,10k,20M),控制合并目标文件大小100M,10k,20M),控制合并目标文件大小和 Chunk 大小
将 TsFileManagement 里管理 TsFileResource 的数据结构与 合并机制分离,固定管理 TsFileResource 的数据结构合并机制分离
能够做到时间分区内部合并任务的并行
能够解决以下场景:
...
- 管理顺序文件时,按照数据时间戳递增排列
- 管理乱序文件时,按照文件版本号由低到高排列
- 移除某些位置的文件,并且在原位加入合并后的文件
待改名:
...
VirtualStorageGroupManager → StorageGroupProsessor
StorageGroupProsessor → VirtualStorageGroupProsessor 虚拟存储组启动定时任务:
合并的整体流程
- 倒序遍历时间分区
- CompactionManager.doCompactionSchedule(TsFileManagement management,long 分区号)
合并管理器 CompactionManager(与 MManager 同级,系统启动时注册)
static volatile boolean isRecovered=false; (是否恢复完成)
recover():恢复
存储组为之前中断的每个合并执行任务,提交异步合并恢复任务
doCompactionSchedule(TsFileManagement management,long 分区号):执行合并调度任务
如果 isRecovered = true,执行 compactionSchedule();
合并调度任务 compactionSchedule
static long currentTaskNum=0;// 当前进行的合并任务个数
static compactionSchedule(TsFileManagement tsfileManagement, long 分区号):
- if: currentTaskNum >= compaction_concurrent_thread: return;
- else:
- tsfileManagement.readLock(resourceList)
- 顺序文件链表=tsfileManagement.getSeqList(分区号)
- 乱序文件链表=tsfileManagement.getUnSeqList(分区号)
- 如果 compaction_priority=balance
- doCompactionInBalancePriority();
- 如果 compaction_priority=inner_cross
- doCompactionInInnerSpacePriority();
- 如果 compaction_priority=cross_inner
- doCompactionInCrossSpacePriority();
- 如果 compaction_priority=balance
- tsfileManagement.readUnLock(resourceList);
doCompactionInBalancePriority
- hasSelected = true
- while (hasSelected && currentTaskNum < compaction_concurrent_thread)
- hasSelected = selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
- hasSelected |= selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
- hasSelected |= selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)
doCompactionInInnerSpacePriority
- selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
- selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
- selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)
doCompactionInCrossSpacePriority
- selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)
- selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
- selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
selectAndSubmitInSpaceFiles(双向文件链表, isSeq): 返回是否选中文件做合并
- hasSelect = false
- 候选文件列表 Flist=[]
- 倒序遍历文件列表
- 如果 (任务已经足够多:currentTaskNum >= compaction_concurrent_thread)
|| (无需执行顺序合并: isSeq && !enable_seq_space_compaction)
|| (无需执行乱序合并: !isSeq && !enable_unseq_space_compaction)- return hasSelected
- 如果(当前文件大小 > target_tsfile_size || 当前文件正在合并 || 当前文件未关闭)
- 清空 FList;
- continue;
- 将当前文件加入 FList
- 如果 FList 内所有文件总大小大于 compaction_target_tsfile_size || FList 内文件数达到 compaction_candidate_file_num
- 将 FList 拷贝提交异步任务(空间内合并执行流程), FList=[],++currentTaskNum, hasSelected=true
- 如果 compaction_priority=balance
- break;
- 如果 (任务已经足够多:currentTaskNum >= compaction_concurrent_thread)
- return hasSelected
选出来的顺序文件:已关闭、未正在合并、至少与一个选出的乱序文件重叠
选出来的乱序文件:已关闭、未正在合并、所有与此乱序文件重叠的顺序文件(至少一个)都被选中
selectAndSubmitCrossSpaceFiles(顺序文件链表 S, 乱序文件链表 U): 返回是否选中文件做合并
- hasSelected = false
- 如果 currentTaskNum >= compaction_concurrent_thread
|| !enable_cross_space_compaction- return hasSelected;
- 候选乱序文件 UF = []
- 候选顺序文件 SF = []
- hasSelected = false
- for 乱序文件 u in 乱序文件链表 U
- if (u 未关闭 || u 正在合并)
- if submitIfNotEmpty(SF, UF):
- 如果 compaction_priority=balance, break;
- if ( 线程池满 ) break;
- continue;
- if submitIfNotEmpty(SF, UF):
- 临时选中顺序文件 TmpSF = []
- continueToNextUnseqFile = false
- for 顺序文件 s in 顺序文件链表 S
- if isOverlap(u, s)
- if s 未关闭 || s 正在合并
- if submitIfNotEmpty(SF, UF):
- 如果 compaction_priority=balance, return hasSelected;
- continueToNextUnseqFile = true; break
- if submitIfNotEmpty(SF, UF):
- else
- TmpSF.add(s)
- if s 未关闭 || s 正在合并
- if isOverlap(u, s)
- if continueToNextUnseqFile
- continue
- SF.addAll(tmpSF), UF.add(u)
- if UF.size() >= compaction_cross_space_max_select_unseq_file_num
- if submitIfNotEmpty(SF, UF)
- if compaction_priority=balance, break
- if ( 线程池满 ) break;
- if submitIfNotEmpty(SF, UF)
- if (u 未关闭 || u 正在合并)
- submitIfNotEmpty(SF, UF)
- return hasSelected
合并执行任务
空间内合并执行流程
跨文件空间合并执行流程
合并恢复任务
顺序空间合并执行流程
乱序空间合并执行流程
跨文件空间合并执行流程
合并任务管理器CompactionTaskManager
该类负责管理合并任务的运行,例如把合并任务加入到等待队列里,并定时从等待队列里拿取高优先级的线程进行执行。
- 重要属性
- compactionTaskQueue:合并线程的等待队列,可容纳的任务数量为1000个,若满了则会踢掉低优先级的线程
- 两个线程池
- taskExecutionPool:合并任务的执行线程池,系统预设的线程数量为10个
- compactionTaskSubmissionThreadPool:定时执行线程池,该池里的线程负责定时从等待队列里拿取一个合并任务进行执行。系统预设的线程数量为1个。
- currentTaskNum:当前正在执行合并任务的线程数量
- Task_Submit_Interval:每个合并线程执行的时间间隔,系统预设值为1秒
合并任务管理器CompactionTaskManager
该类负责管理合并任务的运行,例如把合并任务加入到等待队列里,并定时从等待队列里拿取高优先级的线程进行执行。
- 重要属性
- candidateCompactionTaskQueue:合并线程的等待队列,可容纳的任务数量为1000个,若满了则会踢掉低优先级的线程
- 两个线程池
- taskExecutionPool:合并任务的执行线程池,系统预设的线程数量为10个
- compactionTaskSubmissionThreadPool:定时执行线程池,该池里的线程负责定时从等待队列里拿取一个合并任务进行执行。系统预设的线程数量为1个。
- runningCompactionTaskList:正在执行合并的任务列表
- currentTaskNum:当前正在执行合并任务的线程数量
- Task_Submit_Interval:每个合并线程执行的时间间隔,系统预设值为1秒
- 合并任务的优先级
candidateCompactionTaskQueue队列里,任务是根据自定义的优先级进行排序的,优先级高的任务会被优先执行。当该队列满时,若添加新的合并任务,则会把优先级最低的给踢掉。下面讲解合并任务的优先级定义规则,在CompactionTaskComparator类里:
- 若两个任务一个是空间内合并,另一个是跨空间合并,且系统预设的合并优先级不是balance
- 若系统预设的合并执行优先级是inner_cross,则空间内合并的任务的优先级较高
- 若系统预设的合并执行优先级是cross_inner,则跨空间合并的任务的优先级较高
- 若两个任务都是空间内合并
顺序空间内合并 > 乱序空间内合并
优先合并【低层文件(空间内合并次数低的)】:若两个任务的平均每个待合并文件的空间内合并次数不相等,则平均每个文件空间内合并次数低的任务的优先级较高(因为这样可以减少写放大)
优先合并【文件数量多的】:可以减少大量文件
优先合并【待合并文件总大小小的】:(因为小文件的合并速度较快)
优先合并【新文件】:若两个任务的待合并文件的最大version不同,则较大version的任务的优先级高(因为我们想要优先合并最近落盘的文件)
- 若两个任务都是跨空间合并
优先合并【顺序文件少的】:若两个任务的待合并的顺序文件的数量不一样,则顺序文件数量较少的任务的优先级高(因为在合并过程中占用更少的内存)
乱序文件数量较多的任务的优先级较高(因为会减少更多的乱序文件)
- 若两个任务一个是空间内合并,另一个是跨空间合并,且系统预设的合并优先级不是balance
合并任务调度器CompactionSchedule
...
- tryToSubmitCrossSpaceCompactionTask (顺序文件链表, 乱序文件链表,…)
- tryToSubmitInnerSpaceCompactionTask(顺序文件链表,isSeq=true,…)
- tryToSubmitInnerSpaceCompactionTask (乱序文件链表,isSeq=false,…)
- tryToSubmitInnerSpaceCompactionTask (文件链表, isSeq, …): 返回是否选中文件做合并
- if ((无需执行顺序合并: isSeq && !enable_seq_space_compaction) || (无需执行乱序合并: !isSeq && !enable_unseq_space_compaction))
- return false
- 获取空间内合并的文件选择器AbstractInnerSpaceCompactionSelector
- return innerSpaceCompactionSelector.selectAndSubmit()
- tryToSubmitCorssSpaceCompactionTask (顺序文件链表, 乱序文件链表, …): 返回是否选中文件做合并
- if (无需跨空间合并:!enable_cross_space_compaction)
- return false
- 获取跨空间合并的文件选择器AbstractCrossSpaceCompactionSelector
- return crossSpaceCompactionSelector.selectAndSubmit()
...
合并任务的文件选择器AbstractCompactionSelector
根据一定的策略选择待被合并的一批批TsFile文件,并为每批TsFile文件创建一个合并任务放入合并任务管理器CompactionTaskManager的任务等待队列里。
空间内合并是从低层向高层搜索某层若满足连续10个文件或文件总大小超过2G则将该批文件封装到一个合并任务里,因此一次搜索某层可能出现好几批待合并文件,分别封装成好几个合并任务;跨空间合并则是针对该虚拟存储组下该时间分区下的所有乱序文件与其有Overlap的顺序文件都封装在一个合并任务里,具体判断是否有Overlap的方法是:遍历每个乱序文件里的每个ChunkGroup,在将该乱序ChunkGroup的结束时间依次与每个顺序文件的该设备ChunkGroup的结束时间做比较,若小于则说明该乱序文件与当前遍历到的顺序文件有Overlap。
无论是空间内还是跨空间合并在选择文件的时候都没有读取对应的.mods文件判断数据是否被删除。
空间内合并的文件选择器AbstractInnerSpaceCompactionSelector
...
- 创建临时选中的文件列表selectedFileList和选中的文件总大小selectedFileSize
- 创建Boolean变量shouldContinueToSearch=true。//当在该层找到了一批以上的待合并的文件后则会把该变量设为false,示意文件选择器不再往高层
- 依次遍历该虚拟存储组的该时间分区下的所有顺序文件,
- 若该顺序文件的空间内合并层级不等于currentLevel
- 清空selectedFileList和selectedFileSize,遍历下一个顺序文件
- 否则该顺序文件的空间内合并层级等于currentLeve,则把他加入到selectedFileList,并且selectedFileSize加上该文件大小
- 若 (被选中的文件数量>=系统预设值为10个) || (被选中的文件的总大小>=系统预设值为2GB)
- 把该批选中的文件列表及其总文件大小放入taskPriorityQueue队列里。并清空selectedFileList和selectedFileSize,将shouldContinueToSearch设为设为false
- 创建临时选中的文件列表selectedFileList和选中的文件总大小selectedFileSize
- 若该顺序文件的空间内合并层级不等于currentLevel
...
- 重要属性
- sequenceFileResources:顺序文件链表,存放着该虚拟存储组下的该时间分区下的所有顺序文件。
- unsequenceFileResources:乱序文件链表,存放着该虚拟存储组下的该时间分区下的所有乱序文件。
- taskFactory:任务工厂,负责为每批待合并的TsFile创建一个合并任务
- 主要流程
...
- 从虚拟存储组下该时间分区里的所有顺序和乱序文件中,选取“候选文件”:
- 候选顺序文件:过滤掉isNotExist、isDelete、isOverTTL的文件。
- 候选乱序文件:从第一个文件开始
- 若遇到isNotExist || isDelete || isOverTTL 的则跳过
- 若遇到 isCompacting || isCompactionCandidate || isNotClosed 则返回
- 否则添加到候选乱序文件中
- 在指定的时间内(30s)选取“源文件”,即从候选文件中选取与乱序文件有重叠的顺序文件:
- 遍历乱序文件,寻找与该乱序文件有重叠的所有顺序文件:(使用loose评估内存)
- 若顺序文件中不存在isCompacting、isCompactionCandidate的,则检查并用评估它们在合并中的内存开销是否超过阈值,若否,则将该批乱序和对应的顺序文件放入源文件中
- 若顺序文件中存在一个及以上isCompacting、isCompactionCandidate的,则丢弃该乱序和对应的顺序文件,并停止选取源文件。
- 若选取的乱序文件数量为0,则使用tight评估重新选取源文件
- 遍历乱序文件,寻找与该乱序文件有重叠的所有顺序文件:(使用loose评估内存)
- 若源顺序文件或者源乱序文件列表中有一个为0,则放弃此合并任务。否则对所有的源文件set isCompactionCandidate为true,并把该任务丢进等待队列里。
RewriteCrossSpaceCompactionSelector选择器
- 重要方法
1)selectAndSubmit():返回是否找到文件并提交合并任务
...
文件选择器ICrossSpaceMergeFileSelector
...
RewriteCompactionFileSelector
下面详细介绍选择待合并文件的方法:
- select()方法:返回的数组里第一个元素是待合并的顺序文件列表,第二个元素是待合并的乱序文件列表
...
具体流程:将与该unseqFile乱序文件有Overlap的并且还未被此次合并任务文件选择器选中的顺序文件的索引放入tmpSelectedSeqFiles列表里。具体判断是否有Overlap的做法是:依次遍历获取乱序文件的每个设备ChunkGroup,判断所有还未被此次合并任务选中的顺序文件的该设备ChunkGroup是否有与乱序的ChunkGroup重叠,有的话则选中此顺序文件。
...
合并任务的恢复
在IOTDB-Server重启后,每个StorageGroup会执行顺序空间和乱序空间内的合并恢复和跨空间的合并恢复。
在StorageGroupProcessor类里:
1)空间内合并恢复
recoverInnerSpaceCompaction(isSeq)方法:对该存储组下的每个时间分区里的每个合并日志创建一个空间内合并恢复线程SizeTieredCompactionRecoverTask,并同步执行恢复流程。
2)跨空间合并恢复
recoverCrossSpaceCompaction()方法:对该存储组下的每个时间分区里的每个合并日志创建一个空间内合并恢复线程RewriteCrossCompactionRecoverTask,并同步执行恢复流程。
合并任务的定时执行
当server启动后,每个虚拟存储组首先会执行顺序空间和乱序空间内的合并恢复和跨空间的合并恢复,执行完后就会回调submitTimedCompactionTask()方法以定时执行合并任务,具体操作如下:
(1) 从该虚拟存储组下的每个时间分区里依次使用合并任务调度器CompactionSchedule根据合并策略去选择一批批待合并文件,并为每批文件创建一个合并任务线程放进CompactionTaskManager里的线程等待队列里
(2) 从等待队列里获取一个合并线程并执行
合并的加锁流程
- 选择待合并的源文件,并设置setCompactionCandidate(true),将它们封装入一个合并任务里,并放入等待队列中。
- 从等待队列里拿出来,依次对每个源文件加读锁,并检查该源文件是否Valid,若isValid则setCompacting(true),否则释放所有源文件的读锁并setCompacting(false)
- 执行合并,并将临时目标文件移动成最终目标.tsfile文件,生成.resource文件,合并compactionMods文件,更新内存:
- 给TsFileManager加写锁
- 更新 TsFileResourceManager,使用Sychronized移除源文件的 TsFileResource 并增加目标文件的 TsFileResource
- 更新TsFileManager的TsFileResourceList,移除源文件的tsfileResource,插入目标文件的tsFileResource
- 给TsFileManager释放写锁
- 依次对每个源文件释放读锁、加写锁,删除源文件和日志。
- 对每个源文件释放写锁、setCompacting(false)、setCompactionCandidate(false).