Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


设计原则

控制合并任务提交的入口数量

能够在原始写入的文件大小不一的情况下,控制合并目标文件大小(如初始写出的 能够在原始写入的文件大小不一的情况下(如初始写出的 TsFile 大小为 100M,10k,20M)100M,10k,20M),控制合并目标文件大小和 Chunk 大小

将 TsFileManagement 里管理 TsFileResource 的数据结构与合并机制分离,固定管理 TsFileResource 的数据结构的数据结构与 合并机制分离

能够做到时间分区内部合并任务的并行

能够解决以下场景:

...

顺序空间:顺序数据文件所在的空间

乱序空间:乱序数据文件所在的空间


顺序合并(Seq 顺序空间合并(Seq Compaction):在顺序空间内部进行文件的合并(SizeTired Compaction)

乱序合并(Unseq 乱序空间合并(Unseq Compaction):在乱序空间内部进行文件的合并(SizeTired Compaction)

顺乱序合并跨空间合并(Cross Compaction):将乱序空间的文件合并到顺序空间

...

合并恢复任务(重启时执行一次):将中断的合并执行任务继续执行

...


合并调度的参数

enable_seq_space_compaction=true

...

enable_cross_space_compaction=true (等价之前的 enable_unseq_compaction)

inner_compaction_strategy=SIZE_TIRED_COMPACTION (空间内合并策略)

cross_compaction_strategy=INPLACE(跨空间合并策略)

compaction_priority=inner_cross(优先执行空间内合并,快速减少文件数)、cross_inner(优先执行跨文件空间合并,消除乱序文件)、balance(平衡减少文件数和消除乱序文件)

compaction_concurrent_compaction_thread=50(合并执行任务的并行度)

compaction_interval=10000(定时合并任务的提交间隔,单位 ms)10000(合并调度任务的执行间隔,单位 ms)

compaction_candidate_file_num=100 (候选合并文件个数)

compaction_target_tsfile_size=2000000000 (合并的目标文件大小)

compaction_cross_space_max_select_unseq_file_num=100(跨文件空间合并一次最多提交的乱序文件个数)

TsFileManagement 文件管理器(每个虚拟存储组一个)

...

  • 管理顺序文件时,按照数据时间戳递增排列
  • 管理乱序文件时,按照文件版本号由低到高排列
  • 移除某些位置的文件,并且在原位加入合并后的文件


待改名:

...

VirtualStorageGroupManager → StorageGroupProsessor

StorageGroupProsessor → VirtualStorageGroupProsessor 虚拟存储组启动定时任务:


合并的整体流程

  • 倒序遍历时间分区
    • CompactionManager.compactionSchedule(TsFileManagement management,long 分区号)

全局合并管理器(与 MManager 同级,系统启动时注册)

static volatile boolean isRecovered=false;  (是否恢复完成)

recover():恢复

存储组为之前中断的每个合并执行任务,提交异步合并恢复任务

doCompaction():执行合并调度任务

如果 isRecovered = true,执行 compactionSchedule();

合并调度任务

static  long  currentTaskNum=0;// 当前进行的合并任务个数

Image Added

合并任务管理器CompactionTaskManager

该类负责管理合并任务的运行,例如把合并任务加入到等待队列里,并定时从等待队列里拿取高优先级的线程进行执行。

 

  1. 重要属性
    1. candidateCompactionTaskQueue:合并线程的等待队列,可容纳的任务数量为1000个,若满了则会踢掉低优先级的线程
    2. 两个线程池
      1. taskExecutionPool:合并任务的执行线程池,系统预设的线程数量为10
      2. compactionTaskSubmissionThreadPool:定时执行线程池,该池里的线程负责定时从等待队列里拿取一个合并任务进行执行。系统预设的线程数量为1个。
    3. runningCompactionTaskList:正在执行合并的任务列表
    4. currentTaskNum:当前正在执行合并任务的线程数量
    5. Task_Submit_Interval:每个合并线程执行的时间间隔,系统预设值为1
  2. 合并任务的优先级

           candidateCompactionTaskQueue队列里,任务是根据自定义的优先级进行排序的,优先级高的任务会被优先执行。当该队列满时,若添加新的合并任务,则会把优先级最低的给踢掉。下面讲解合并任务的优先级定义规则,在CompactionTaskComparator类里:

    1. 若两个任务一个是空间内合并,另一个是跨空间合并,且系统预设的合并优先级不是balance
      1. 若系统预设的合并执行优先级是inner_cross,则空间内合并的任务的优先级较高
      2. 若系统预设的合并执行优先级是cross_inner,则跨空间合并的任务的优先级较高
    2. 若两个任务都是空间内合并
      1. 顺序空间内合并 > 乱序空间内合并

      2. 优先合并【低层文件(空间内合并次数低的)】:若两个任务的平均每个待合并文件的空间内合并次数不相等,则平均每个文件空间内合并次数低的任务的优先级较高(因为这样可以减少写放大)

      3. 优先合并【文件数量多的】:可以减少大量文件

      4. 优先合并【待合并文件总大小小的】:(因为小文件的合并速度较快)

      5. 优先合并【新文件】:若两个任务的待合并文件的最大version不同,则较大version的任务的优先级高(因为我们想要优先合并最近落盘的文件)

    3. 若两个任务都是跨空间合并
      1. 优先合并【顺序文件少的】:若两个任务的待合并的顺序文件的数量不一样,则顺序文件数量较少的任务的优先级高(因为在合并过程中占用更少的内存)

      2. 乱序文件数量较多的任务的优先级较高(因为会减少更多的乱序文件)


合并任务调度器CompactionSchedule

该类用于根据系统预设的合并优先级策略去创建对应的文件选择器,并使用文件选择器去选择待合并的一批批TsFile并为每一批文件创建一个合并任务(空间内合并、跨空间合并)线程放进CompactionTaskManager的等待队列里。

 

  1. 方法详解

1) static scheduleCompaction

...

(TsFileManagement tsfileManagement, long 分区号)

  • if: currentTaskNum >= concurrent_compaction_thread: return;
  • else:
  • tsfileManagement.
    • readLock(resourceList)
    • 顺序文件链表=tsfileManagement.getSeqList(分区号)
    • 乱序文件链表=tsfileManagement.getUnSeqList(分区号)
      • 如果 compaction_priority=balance
      • doCompactionInBalancePriority
            • doCompactionBalancePriority(); 
          • 如果 compaction_priority=inner_cross
          • doCompactionInInnerSpacePriority
                • doCompactionInnerCrossPriority();
          • 如果 compaction
              • 如果compaction_priority=cross_inner
              • doCompactionInCrossSpacePriority
                    • doCompactionCrossInnerPriority();
              • tsfileManagement.
                  • readUnLock(resourceList);

              ...

              2) doCompactionBalancePriority

                hasSelected
                • taskSubmitted = true
                • while (
              • hasSelected
                • taskSubmitted && currentTaskNum <
              • concurrent_compaction_thread
                • concurrentCompactionThread)
                  • hasSelected
                      • taskSubmitted =
                  • selectInSpaceFiles
                      • tryToSubmitInnerSpaceCompactionTask(
                  • 顺序文件链表,isSeq
                      • 顺序文件链表,isSeq=true)
                      • hasSelected |=
                  • selectInSpaceFiles
                      • tryToSubmitInnerSpaceCompactionTask (
                  • 乱序文件链表,isSeq
                      • 乱序文件链表,isSeq=false)
                      • hasSelected |=
                  • crossSpaceCompaction
                      • tryToSubmitCrossSpaceCompactionTask (顺序文件链表, 乱序文件链表,)
                  • ;

                  ...

                  3) doCompactionInnerCrossPriority

                    selectInSpaceFiles
                    • tryToSubmitInnerSpaceCompactionTask(
                  • 顺序文件链表,isSeq
                    • 顺序文件链表,isSeq=true)
                  • selectInSpaceFiles
                    • tryToSubmitInnerSpaceCompactionTask (
                  • 乱序文件链表,isSeq
                    • 乱序文件链表,isSeq=false)
                  • crossSpaceCompaction
                    • tryToSubmitCrossSpaceCompactionTask (顺序文件链表, 乱序文件链表,)
                  • ;

                  ...

                  4)  doCompactionCrossInnerPriority

                    crossSpaceCompaction
                    • tryToSubmitCrossSpaceCompactionTask (顺序文件链表, 乱序文件链表,)
                  • selectInSpaceFiles
                    • tryToSubmitInnerSpaceCompactionTask(
                  • 顺序文件链表,isSeq
                    • 顺序文件链表,isSeq=true)
                  • selectInSpaceFiles
                    • tryToSubmitInnerSpaceCompactionTask (
                  • 乱序文件链表,isSeq
                    • 乱序文件链表,isSeq=false)

                      ...

                          • tryToSubmitInnerSpaceCompactionTask (

                      ...

                        倒序遍历文件列表如果  (任务已经足够多:currentTaskNum >= concurrent_compaction_thread)
                              ||
                          • 文件链表, isSeq, …): 返回是否选中文件做合并
                      • hasSelect = false
                      • 候选文件列表 Flist=[]
                        • if ((无需执行顺序合并: isSeq && !enable_seq_space_compaction)

                      •      
                        • || (无需执行乱序合并: !isSeq && !enable_unseq_space_compaction))
                          • return
                      • hasSelected
                      • 如果(当前文件大小 > target_tsfile_size || 当前文件正在合并 || 当前文件未关闭),
                        • 清空 FList,
                        • continue;
                      • 将当前文件加入 FList
                      • 如果 FList 内所有文件总大小大于 target_tsfile_size
                        • 将 FList 拷贝提交异步任务(空间内合并执行流程), FList=[],++currentTaskNum, hasSelected=true
                        • 如果 compaction_priority=balance
                          • break;
                      • return hasSelected

                      crossSpaceCompaction(): 返回是否选中文件做合并

                      • hasSelected = false
                      • 如果 currentTaskNum >= concurrent_compaction_thread
                               || !enable_cross_space_compaction
                        • return hasSelected;
                      • 候选文件 F = []
                      • hasSelected = false
                      • for 顺序文件 s in  顺序文件链表 S
                        • if isOverlap(U, s) && memoryCost(F, s) < memory_budget
                          • 将 s 添加进 F 中
                          • hasSelected = true
                      • 将 F 提交异步任务(跨文件空间合并执行流程)
                        • ++currentTaskNum
                        • hasSelected = true
                        • 如果 compaction_priority=balance, break;
                      • return hasSelected

                      合并执行任务

                      空间内合并执行流程

                      跨文件空间合并执行流程

                      合并恢复任务

                      顺序空间合并执行流程

                      乱序空间合并执行流程

                          • false
                        • 获取空间内合并的文件选择器AbstractInnerSpaceCompactionSelector
                        • return innerSpaceCompactionSelector.selectAndSubmit()
                          • tryToSubmitCorssSpaceCompactionTask (顺序文件链表, 乱序文件链表, …): 返回是否选中文件做合并
                        • if (无需跨空间合并:!enable_cross_space_compaction)
                          • return false
                        • 获取跨空间合并的文件选择器AbstractCrossSpaceCompactionSelector
                        • return crossSpaceCompactionSelector.selectAndSubmit()

                      合并任务的文件选择器AbstractCompactionSelector

                      根据一定的策略选择待被合并的一批批TsFile文件,并为每批TsFile文件创建一个合并任务放入合并任务管理器CompactionTaskManager的任务等待队列里。

                      空间内合并是从低层向高层搜索某层若满足连续10个文件或文件总大小超过2G则将该批文件封装到一个合并任务里,因此一次搜索某层可能出现好几批待合并文件,分别封装成好几个合并任务;跨空间合并则是针对该虚拟存储组下该时间分区下的所有乱序文件与其有Overlap的顺序文件都封装在一个合并任务里,具体判断是否有Overlap的方法是:遍历每个乱序文件里的每个ChunkGroup,在将该乱序ChunkGroup的结束时间依次与每个顺序文件的该设备ChunkGroup的结束时间做比较,若小于则说明该乱序文件与当前遍历到的顺序文件有Overlap。

                      无论是空间内还是跨空间合并在选择文件的时候都没有读取对应的.mods文件判断数据是否被删除。

                      空间内合并的文件选择器AbstractInnerSpaceCompactionSelector

                      1. 重要属性
                        1. tsFileResources:文件链表,存放着该虚拟存储组下的该时间分区下的所有顺序或者乱序文件。
                        2. sequence:顺序or乱序
                        3. taskFactory:任务工厂,负责为每批待合并的TsFile创建一个合并任务

                      SizeTieredCompactionSelector选择器

                      1. 重要方法

                      1) selectAndSubmit():返回是否找到文件并提交合并任务

                      //该方法从0层向最高层,逐层搜索是否有待被合并的一批批文件,若某层上有一批以上的待合并文件,则不再向高层搜索,并且为每批文件创建合并线程,并放入合并任务管理器CompactionTaskManager的任务等待队列里

                        • readLock();
                        • 创建任务优先级队列taskPriorityQueue//负责存放每批待合并文件的TsFileResource列表和文件的总大小
                        • 获取所有的顺序或者乱序文件里空间内合并的最大层数maxLevel=searchFileMaxLevel();
                        • 从第0层开始向最高层,针对每一层执行如下文件搜索
                        • if!selectLevelTask(currentLevel,taskPriorityQueue)//若当前层搜索到一批以上待合并的文件,则把他们的TsFileResource列表和总文件大小放入taskPriorityQueue队列里,且不继续向高层搜索文件
                          • break
                        •  遍历taskPriorityQueue里每个元素 
                          • 使用taskFactory对任务优先级队列里的该批文件创建合并线程,并放入合并任务管理器CompactionTaskManager的任务等待队列里

                       

                      2) selectLevelTask(currentLevel,taskPriorityQueue) :返回是否要向高层level继续搜索  

                      //该方法搜索第level层上的所有文件,若该层上有连续的文件满足系统预设的条件(数量超过10个或者总文件大小超过2G)则为该批文件创建一个合并任务放进taskPriorityQueue队列里,并继续搜索下一批。若在该层上搜索到至少一批以上待合并文件,则返回false(示意不再向高层搜索),否则返回true。

                        • 创建临时选中的文件列表selectedFileList和选中的文件总大小selectedFileSize
                        • 创建Boolean变量shouldContinueToSearch=true//当在该层找到了一批以上的待合并的文件后则会把该变量设为false,示意文件选择器不再往高层
                        • 依次遍历该虚拟存储组的该时间分区下的所有顺序文件,
                          • 若该顺序文件的空间内合并层级不等于currentLevel
                            • 清空selectedFileListselectedFileSize,遍历下一个顺序文件
                          • 否则该顺序文件的空间内合并层级等于currentLeve,则把他加入到selectedFileList,并且selectedFileSize加上该文件大小
                          • (被选中的文件数量>=系统预设值为10) || (被选中的文件的总大小>=系统预设值为2GB
                            • 把该批选中的文件列表及其总文件大小放入taskPriorityQueue队列里。并清空selectedFileListselectedFileSize,将shouldContinueToSearch设为false
                          • 创建临时选中的文件列表selectedFileList和选中的文件总大小selectedFileSize

                       

                      跨空间合并的文件选择器AbstractCrossSpaceCompactionSelector

                      1. 重要属性
                        1. sequenceFileResources:顺序文件链表,存放着该虚拟存储组下的该时间分区下的所有顺序文件。
                        2. unsequenceFileResources:乱序文件链表,存放着该虚拟存储组下的该时间分区下的所有乱序文件。
                        3. taskFactory:任务工厂,负责为每批待合并的TsFile创建一个合并任务
                      2. 主要流程
                        • 从虚拟存储组下该时间分区里的所有顺序和乱序文件中,选取“候选文件”:
                          • 候选顺序文件:过滤掉isNotExistisDeleteisOverTTL的文件。
                          • 候选乱序文件:从第一个文件开始
                            • 若遇到isNotExist || isDelete || isOverTTL 的则跳过
                            • 若遇到 isCompacting || isCompactionCandidate || isNotClosed 则返回
                            • 否则添加到候选乱序文件中
                        • 在指定的时间内(30s)选取“源文件”,即从候选文件中选取与乱序文件有重叠的顺序文件:
                          • 遍历乱序文件,寻找与该乱序文件有重叠的所有顺序文件:(使用loose评估内存)
                            • 若顺序文件中不存在isCompactingisCompactionCandidate的,则检查并用评估它们在合并中的内存开销是否超过阈值,若否,则将该批乱序和对应的顺序文件放入源文件中
                            • 若顺序文件中存在一个及以上isCompactingisCompactionCandidate的,则丢弃该乱序和对应的顺序文件,并停止选取源文件。
                          • 若选取的乱序文件数量为0,则使用tight评估重新选取源文件
                        • 若源顺序文件或者源乱序文件列表中有一个为0,则放弃此合并任务。否则对所有的源文件set isCompactionCandidatetrue,并把该任务丢进等待队列里。

                       

                      RewriteCrossSpaceCompactionSelector选择器

                      1. 重要方法

                      1)selectAndSubmit():返回是否找到文件并提交合并任务

                      //该方法根据跨空间合并的文件选择策略创建具体的文件选择器,使用文件选择器选取该虚拟存储组下该时间分区下的所有待被合并的乱序文件和顺序文件,并为他们创建一个合并任务放入合并任务管理器CompactionTaskManager的任务等待队列里

                        • 若 (当前正在合并的线程数量>=系统预设的数量为10)||(系统预设为不允许跨空间合并:!enableCrossSpaceCompaction||(当前虚拟存储组的当前时间分区存在合并任务) //有个基类,跨空间和空间内合并的文件选择器的基类
                          • return false
                        • 若该存储组的该时间分区下不存在顺序文件或乱序文件
                          • return false   //注意:当只有乱序文件而没有顺序文件时,则应该移动或者重写此乱序文件(可能顺序文件的数据被删掉了,而乱序文件还存在)
                        • 若乱序文件的数量>系统预设的合并任务最大的文件数量为10
                          • 截取前几个乱序文件,数量为maxCompactionCandidateFileNum
                        • 创建跨空间合并的资源管理器mergeResource
                        • 根据系统预设的跨空间合并的文件选择策略(Max_File_Num或者Max_Series_Num)创建具体的文件选择器ICrossSpaceMergeFileSelector
                        • crossSpaceMergeFileSelector.select(),使用具体的文件选择器选取所有的待被合并的乱序文件和顺序文件,装入mergeFiles数组里,第一个元素是待合并的顺序文件列表,第二个元素是待合并的乱序文件列表
                        • mergeFiles长度为0
                          • return false
                        • mergeResource.clear()
                        • mergeResource.setCacheDeviceMeta()
                        • 使用任务工厂taskFactory和选中待合并的乱序和顺序文件创建合并任务,并添加到合并任务管理器CompactionTaskManager的任务等待队列里
                        • return true


                      文件选择器ICrossSpaceMergeFileSelector

                      RewriteCompactionFileSelector

                      下面详细介绍选择待合并文件的方法:

                      1.   select()方法:返回的数组里第一个元素是待合并的顺序文件列表,第二个元素是待合并的乱序文件列表

                      1)具体流程:在系统预设的时间内(30秒),针对每个乱序文件查找与其Overlap且还未被此次合并任务文件选择器选中的顺序文件列表,每找到一个乱序文件及其对应的Overlap顺序文件列表后就预估他们进行合并可能增加的额外内存开销(优先使用loose估计),若未超过系统给合并线程预设的内存开销,则把他们放入到此合并任务选中的顺序和乱序文件里。更新该虚拟存储组下该时间分区的跨空间合并资源管理器里的顺序文件和乱序文件列表,移除其未被选中的文件的SequenceReader以清缓存。

                      2)计算合并某一乱序文件与其对应的Overlap的顺序文件会新增的内存开销的方法:

                      //loose估计出的大小会偏大、误差大;而tight估计出的大小精确到具体合并的有多少序列,误差小

                      (1)Loose估计:乱序文件占用的大小(即整个乱序文件大小,因为可能会读取出乱序文件里的所有Chunk+顺序文件占用的大小(由于每次只读取一个顺序文件,因此cost记录顺序文件里最大的metadata大小;写入的时候每个文件都会有metadata,因此cost还要把所有顺序文件的metadata加起来)

                      (2)Tight估计:计算获取查询该乱序文件里concurrentMergeNum个时间序列共要花费的可能最大内存空间,即(总文件大小*MaxChunkNum/totalChunkNum);计算获取查询该顺序文件里concurrentMergeNum个时间序列共要花费的可能最大内存空间,即(总索引区大小*MaxChunkNum序列最多含有的Chunk数量/totalChunkNum所有序列Chunk数量和)

                              优先使用loose估计内存的原因是速度快,可是估计结果偏大,误差大,可能导致估计的内存太大超过系统给合并任务预值的内存值,因此该合并任务就没有一个被选中待合并的乱序和对应Overlap的顺序文件。若发生这种情况,再使用tight估计内存来选取待合并文件,它所估计的内存精确到具体要访问几个序列,较精确,可是要访问文件磁盘上每个序列的具体Chunk数量,速度较慢。


                      2.  select(isTight)方法   

                        • 获取系统给合并任务的文件选择预设的时间阈值为30秒·       
                        • 遍历每个乱序文件,且当选择文件的耗时还未超过系统预设值时
                          • 当被选中待合并的顺序文件的数量 != 所有顺序文件的数量
                            • selectOverLappedSeqFiles(unseqFile)选中与该乱序文件Overlap的顺序文件
                          • 若该乱序文件或者与其Overlap的某一顺序文件未关闭或正在merging§  清空临时变量,遍历下个乱序文件
                          • 计算合并此乱序文件与其对应Overlap的顺序文件列表可能会新增的内存开销
                          • 若 合并此乱序文件与其对应Overlap的顺序文件列表可能会新增的内存开销加上原有的其他乱序文件进行合并的开销不会超过系统给合并线程预设的内存开销
                            • 把选中的乱序文件和对应Overlap的顺序文件放入全局对象列表里

                      3.  selectOverLappedSeqFiles(unseqFile)

                      具体流程:将与该unseqFile乱序文件有Overlap的并且还未被此次合并任务文件选择器选中的顺序文件的索引放入tmpSelectedSeqFiles列表里。具体判断是否有Overlap的做法是:依次遍历获取乱序文件的每个设备ChunkGroup,判断所有还未被此次合并任务选中的顺序文件的该设备ChunkGroup是否有与乱序的ChunkGroup重叠,有的话则选中此顺序文件。

                       


                      合并任务的恢复

                      IOTDB-Server重启后,每个StorageGroup会执行顺序空间和乱序空间内的合并恢复和跨空间的合并恢复。

                      StorageGroupProcessor类里:

                      1)空间内合并恢复

                      recoverInnerSpaceCompaction(isSeq)方法:对该存储组下的每个时间分区里的每个合并日志创建一个空间内合并恢复线程SizeTieredCompactionRecoverTask,并同步执行恢复流程。

                      2)跨空间合并恢复

                              recoverCrossSpaceCompaction()方法:对该存储组下的每个时间分区里的每个合并日志创建一个空间内合并恢复线程RewriteCrossCompactionRecoverTask,并同步执行恢复流程。


                      合并任务的定时执行

                      当server启动后,每个虚拟存储组首先会执行顺序空间和乱序空间内的合并恢复和跨空间的合并恢复,执行完后就会回调submitTimedCompactionTask()方法以定时执行合并任务,具体操作如下:

                      (1) 从该虚拟存储组下的每个时间分区里依次使用合并任务调度器CompactionSchedule根据合并策略去选择一批批待合并文件,并为每批文件创建一个合并任务线程放进CompactionTaskManager里的线程等待队列里

                      (2) 从等待队列里获取一个合并线程并执行

                      合并的加锁流程

                      1. 选择待合并的源文件,并设置setCompactionCandidate(true),将它们封装入一个合并任务里,并放入等待队列中。
                      2. 从等待队列里拿出来,依次对每个源文件加读锁,并检查该源文件是否Valid,若isValidsetCompacting(true),否则释放所有源文件的读锁并setCompacting(false)
                      3. 执行合并,并将临时目标文件移动成最终目标.tsfile文件,生成.resource文件,合并compactionMods文件,更新内存:
                        1. 给TsFileManager加写锁
                        2. 更新 TsFileResourceManager,使用Sychronized移除源文件的 TsFileResource 并增加目标文件的 TsFileResource
                        3. 更新TsFileManagerTsFileResourceList,移除源文件的tsfileResource,插入目标文件的tsFileResource
                        4. 给TsFileManager释放写锁
                      4. 依次对每个源文件释放读锁加写锁,删除源文件和日志。
                      5. 对每个源文件释放写锁setCompacting(false)setCompactionCandidate(false).

                      ...