You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 27 Next »


设计原则

减少合并任务提交的入口数量

能够在原始写入的文件大小不一的情况下(如初始写出的 TsFile 大小为 100M,10k,20M),控制合并目标文件大小

将 TsFileManagement 里管理 TsFileResource 的数据结构与 合并机制分离,固定管理 TsFileResource 的数据结构

能够做到时间分区内部合并任务的并行

能够解决以下场景:

  • 场景1:生成海量小文件
  • 场景2:合并速度小于写入速度:10个存储组,每个存储组每天会产生10个文件,每个线程每天能合5个文件,系统最多有10个线程


名词解释

顺序空间:顺序数据文件所在的空间

乱序空间:乱序数据文件所在的空间


顺序空间合并(Seq Compaction):在顺序空间内部进行文件的合并(SizeTired Compaction)

乱序空间合并(Unseq Compaction):在乱序空间内部进行文件的合并(SizeTired Compaction)

跨空间合并(Cross Compaction):将乱序空间的文件合并到顺序空间


合并的总体任务

合并调度任务(定时启动):选择合并文件候选集,并提交合并执行任务

合并执行任务:接收待合并文件列表,合并到目标文件,并删除老文件

合并恢复任务(重启时执行一次):将中断的合并执行任务继续执行


合并调度的参数

enable_seq_space_compaction=true

enable_unseq_space_compaction=true

enable_cross_space_compaction=true (等价之前的 enable_unseq_compaction)

inner_compaction_strategy=SIZE_TIRED_COMPACTION (空间内合并策略)

cross_compaction_strategy=INPLACE(跨空间合并策略)

compaction_priority=inner_cross(优先执行空间内合并,快速减少文件数)、cross_inner(优先执行跨文件空间合并,消除乱序文件)、balance(平衡减少文件数和消除乱序文件)

compaction_concurrent_thread=50(合并执行任务的并行度)

compaction_interval=10000(合并调度任务的执行间隔,单位 ms)

compaction_candidate_file_num=100 (候选合并文件个数)

compaction_target_tsfile_size=2000000000 (合并的目标文件大小)

compaction_cross_space_max_select_unseq_file_num=100(跨文件空间合并一次最多提交的乱序文件个数)

TsFileManagement 文件管理器(每个虚拟存储组一个)

Map<Long, 双向链表<TsFileResource>>  seqList:  分区 → 顺序文件列表

Map<Long, 双向链表<TsFileResource>>  unseqList:  分区 → 乱序文件列表


双向链表功能:

  • 管理顺序文件时,按照数据时间戳递增排列
  • 管理乱序文件时,按照文件版本号由低到高排列
  • 移除某些位置的文件,并且在原位加入合并后的文件

VirtualStorageGroupManager → StorageGroupProsessor


StorageGroupProsessor → VirtualStorageGroupProsessor 虚拟存储组

启动定时任务:

  • 倒序遍历时间分区
    • CompactionManager.doCompactionSchedule(TsFileManagement management,long 分区号)


合并管理器 CompactionManager(与 MManager 同级,系统启动时注册)

static volatile boolean isRecovered=false;  (是否恢复完成)

recover():恢复

存储组为之前中断的每个合并执行任务,提交异步合并恢复任务

doCompactionSchedule(TsFileManagement management,long 分区号):执行合并调度任务

如果 isRecovered = true,执行 compactionSchedule();

合并调度任务 compactionSchedule

static  long  currentTaskNum=0;// 当前进行的合并任务个数

static compactionSchedule(TsFileManagement tsfileManagement, long 分区号):

  • if: currentTaskNum >= compaction_concurrent_thread: return;
  • else:
  • tsfileManagement.readLock(resourceList)
  • 顺序文件链表=tsfileManagement.getSeqList(分区号)
  • 乱序文件链表=tsfileManagement.getUnSeqList(分区号)
    • 如果 compaction_priority=balance
      • doCompactionInBalancePriority(); 
    • 如果 compaction_priority=inner_cross
      • doCompactionInInnerSpacePriority();
    • 如果 compaction_priority=cross_inner
      • doCompactionInCrossSpacePriority();
  • tsfileManagement.readUnLock(resourceList);


doCompactionInBalancePriority

  • hasSelected = true
  • while (hasSelected && currentTaskNum < compaction_concurrent_thread)
    • hasSelected = selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
    • hasSelected |= selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
    • hasSelected |= selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)


doCompactionInInnerSpacePriority

  • selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
  • selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
  • selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)


doCompactionInCrossSpacePriority

  • selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)
  • selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
  • selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)


selectAndSubmitInSpaceFiles(双向文件链表, isSeq): 返回是否选中文件做合并

  • hasSelect = false
  • 候选文件列表 Flist=[]
  • 倒序遍历文件列表
    • 如果  (任务已经足够多:currentTaskNum >= compaction_concurrent_thread)
            || (无需执行顺序合并: isSeq && !enable_seq_space_compaction)
            || (无需执行乱序合并: !isSeq && !enable_unseq_space_compaction)
      • return hasSelected
    • 如果(当前文件大小 > target_tsfile_size || 当前文件正在合并 || 当前文件未关闭)
      • 清空 FList;
      • continue;
    • 将当前文件加入 FList
    • 如果 FList 内所有文件总大小大于 compaction_target_tsfile_size || FList 内文件数达到 compaction_candidate_file_num
      • 将 FList 拷贝提交异步任务(空间内合并执行流程), FList=[],++currentTaskNum, hasSelected=true
      • 如果 compaction_priority=balance
        • break;
  • return hasSelected



选出来的顺序文件:已关闭、未正在合并、至少与一个选出的乱序文件重叠

选出来的乱序文件:已关闭、未正在合并、所有与此乱序文件重叠的顺序文件(至少一个)都被选中


selectAndSubmitCrossSpaceFiles(顺序文件链表 S, 乱序文件链表 U): 返回是否选中文件做合并

  • hasSelected = false
  • 如果 currentTaskNum >= compaction_concurrent_thread
           || !enable_cross_space_compaction
    • return hasSelected;
  • 候选乱序文件 UF = []
  • 候选顺序文件 SF = []
  • hasSelected = false
  • for 乱序文件 u in  乱序文件链表 U
    • if (u 未关闭 || u 正在合并)
      • if submitIfNotEmpty(SF, UF): 
        • 如果 compaction_priority=balance, break;
        • if ( 线程池满 ) break;
      • continue;
    • 临时选中顺序文件 TmpSF = [] 
    • continueToNextUnseqFile = false
    • for 顺序文件 s in 顺序文件链表 S
      • if isOverlap(u, s)
        • if s 未关闭 || s 正在合并
          • if submitIfNotEmpty(SF, UF): 
            • 如果 compaction_priority=balance, return hasSelected;
          • continueToNextUnseqFile = true;  break
        • else
          • TmpSF.add(s)
    • if continueToNextUnseqFile
      • continue
    • SF.addAll(tmpSF), UF.add(u)
    • if UF.size() >= compaction_cross_space_max_select_unseq_file_num
      • if submitIfNotEmpty(SF, UF)
        • if compaction_priority=balance, break
        • if ( 线程池满 ) break;
  • submitIfNotEmpty(SF, UF)
  • return hasSelected


合并执行任务

空间内合并执行流程

跨文件空间合并执行流程


合并恢复任务

顺序空间合并执行流程

乱序空间合并执行流程

跨文件空间合并执行流程


合并任务管理器CompactionTaskManager

该类负责管理合并任务的运行,例如把合并任务加入到等待队列里,并定时从等待队列里拿取高优先级的线程进行执行。

 

  1. 重要属性
    1. compactionTaskQueue:合并线程的等待队列,可容纳的任务数量为1000个,若满了则会踢掉低优先级的线程
    2. 两个线程池
      1. taskExecutionPool:合并任务的执行线程池,系统预设的线程数量为10
      2. compactionTaskSubmissionThreadPool:定时执行线程池,该池里的线程负责定时从等待队列里拿取一个合并任务进行执行。系统预设的线程数量为1个。
    3. currentTaskNum:当前正在执行合并任务的线程数量
    4. Task_Submit_Interval:每个合并线程执行的时间间隔,系统预设值为1


合并任务调度器CompactionSchedule

该类用于根据系统预设的合并优先级策略去创建对应的文件选择器,并使用文件选择器去选择待合并的一批批TsFile并为每一批文件创建一个合并任务(空间内合并、跨空间合并)线程放进CompactionTaskManager的等待队列里。

 

  1. 方法详解

1) static scheduleCompaction (TsFileManagement tsfileManagement, long 分区号)

    • readLock(resourceList)
    • 顺序文件链表=tsfileManagement.getSeqList(分区号)
    • 乱序文件链表=tsfileManagement.getUnSeqList(分区号)
      • 如果 compaction_priority=balance
        • doCompactionBalancePriority(); 
      • 如果 compaction_priority=inner_cross
        • doCompactionInnerCrossPriority();
      • 如果compaction_priority=cross_inner
        • doCompactionCrossInnerPriority();
      • readUnLock(resourceList);

2) doCompactionBalancePriority

    • taskSubmitted = true
    • while (taskSubmitted && currentTaskNum < concurrentCompactionThread)
      • taskSubmitted = tryToSubmitInnerSpaceCompactionTask(顺序文件链表,isSeq=true…)
      • hasSelected |= tryToSubmitInnerSpaceCompactionTask (乱序文件链表,isSeq=false…)
      • hasSelected |= tryToSubmitCrossSpaceCompactionTask (顺序文件链表, 乱序文件链表,…)

3) doCompactionInnerCrossPriority

    • tryToSubmitInnerSpaceCompactionTask(顺序文件链表,isSeq=true…)
    • tryToSubmitInnerSpaceCompactionTask (乱序文件链表,isSeq=false…)
    • tryToSubmitCrossSpaceCompactionTask (顺序文件链表, 乱序文件链表,…)

4)  doCompactionCrossInnerPriority

    • tryToSubmitCrossSpaceCompactionTask (顺序文件链表, 乱序文件链表,…)
    • tryToSubmitInnerSpaceCompactionTask(顺序文件链表,isSeq=true…)
    • tryToSubmitInnerSpaceCompactionTask (乱序文件链表,isSeq=false…)
      • tryToSubmitInnerSpaceCompactionTask (文件链表, isSeq, …): 返回是否选中文件做合并
    • if ((无需执行顺序合并: isSeq && !enable_seq_space_compaction) || (无需执行乱序合并: !isSeq && !enable_unseq_space_compaction))
      • return false
    • 获取空间内合并的文件选择器AbstractInnerSpaceCompactionSelector
    • return innerSpaceCompactionSelector.selectAndSubmit()
      • tryToSubmitCorssSpaceCompactionTask (顺序文件链表, 乱序文件链表, …): 返回是否选中文件做合并
    • if (无需跨空间合并:!enable_cross_space_compaction)
      • return false
    • 获取跨空间合并的文件选择器AbstractCrossSpaceCompactionSelector
    • return crossSpaceCompactionSelector.selectAndSubmit()

合并任务的文件选择器

根据一定的策略选择待被合并的一批批TsFile文件,并为每批TsFile文件创建一个合并任务放入合并任务管理器CompactionTaskManager的任务等待队列里。

空间内合并的文件选择器AbstractInnerSpaceCompactionSelector

  1. 重要属性
    1. tsFileResources:文件链表,存放着该虚拟存储组下的该时间分区下的所有顺序或者乱序文件。
    2. sequence:顺序or乱序
    3. taskFactory:任务工厂,负责为每批待合并的TsFile创建一个合并任务

SizeTieredCompactionSelector选择器

  1. 重要方法

1) selectAndSubmit():返回是否找到文件并提交合并任务

//该方法从0层向最高层,逐层搜索是否有待被合并的一批批文件,若某层上有一批以上的待合并文件,则不再向高层搜索,并且为每批文件创建合并线程,并放入合并任务管理器CompactionTaskManager的任务等待队列里

    • readLock();
    • 创建任务优先级队列taskPriorityQueue//负责存放每批待合并文件的TsFileResource列表和文件的总大小
    • 获取所有的顺序或者乱序文件里空间内合并的最大层数maxLevel=searchFileMaxLevel();
    • 从第0层开始向最高层,针对每一层执行如下文件搜索
    • if!selectLevelTask(currentLevel,taskPriorityQueue)//若当前层搜索到一批以上待合并的文件,则把他们的TsFileResource列表和总文件大小放入taskPriorityQueue队列里,且不继续向高层搜索文件
      • break
    •  遍历taskPriorityQueue里每个元素 
      • 使用taskFactory对任务优先级队列里的该批文件创建合并线程,并放入合并任务管理器CompactionTaskManager的任务等待队列里

 

2) selectLevelTask(currentLevel,taskPriorityQueue) :返回是否要向高层level继续搜索  

//该方法搜索第level层上的所有文件,若该层上有连续的文件满足系统预设的条件(数量超过10个或者总文件大小超过2G)则为该批文件创建一个合并任务放进taskPriorityQueue队列里,并继续搜索下一批。若在该层上搜索到至少一批以上待合并文件,则返回false(示意不再向高层搜索),否则返回true。

    • 创建临时选中的文件列表selectedFileList和选中的文件总大小selectedFileSize
    • 创建Boolean变量shouldContinueToSearch=true//当在该层找到了一批以上的待合并的文件后则会把该变量设为false,示意文件选择器不再往高层
    • 依次遍历该虚拟存储组的该时间分区下的所有顺序文件,
      • 若该顺序文件的空间内合并层级不等于currentLevel
        • 清空selectedFileListselectedFileSize,遍历下一个顺序文件
      • 否则该顺序文件的空间内合并层级等于currentLeve,则把他加入到selectedFileList,并且selectedFileSize加上该文件大小
      • (被选中的文件数量>=系统预设值为10) || (被选中的文件的总大小>=系统预设值为2GB
        • 把该批选中的文件列表及其总文件大小放入taskPriorityQueue队列里。并清空selectedFileListselectedFileSize,将shouldContinueToSearch设为
      • 创建临时选中的文件列表selectedFileList和选中的文件总大小selectedFileSize

 

跨空间合并的文件选择器AbstractCrossSpaceCompactionSelector

  1. 重要属性
    1. sequenceFileResources:顺序文件链表,存放着该虚拟存储组下的该时间分区下的所有顺序文件。
    2. unsequenceFileResources:乱序文件链表,存放着该虚拟存储组下的该时间分区下的所有乱序文件。
    3. taskFactory:任务工厂,负责为每批待合并的TsFile创建一个合并任务

 

InplaceCompactionSelector选择器

  1. 重要方法

1)selectAndSubmit():返回是否找到文件并提交合并任务

//该方法根据跨空间合并的文件选择策略创建具体的文件选择器,使用文件选择器选取该虚拟存储组下该时间分区下的所有待被合并的乱序文件和顺序文件,并为他们创建一个合并任务放入合并任务管理器CompactionTaskManager的任务等待队列里

    • 若 (当前正在合并的线程数量>=系统预设的数量为10)||(系统预设为不允许跨空间合并:!enableCrossSpaceCompaction||(当前虚拟存储组的当前时间分区存在合并任务) //有个基类,跨空间和空间内合并的文件选择器的基类
      • return false
    • 若该存储组的该时间分区下不存在顺序文件或乱序文件
      • return false   //注意:当只有乱序文件而没有顺序文件时,则应该移动或者重写此乱序文件(可能顺序文件的数据被删掉了,而乱序文件还存在)
    • 若乱序文件的数量>系统预设的合并任务最大的文件数量为10
      • 截取前几个乱序文件,数量为maxCompactionCandidateFileNum
    • 创建跨空间合并的资源管理器mergeResource
    • 根据系统预设的跨空间合并的文件选择策略(Max_File_Num或者Max_Series_Num)创建具体的文件选择器ICrossSpaceMergeFileSelector
    • crossSpaceMergeFileSelector.select(),使用具体的文件选择器选取所有的待被合并的乱序文件和顺序文件,装入mergeFiles数组里,第一个元素是待合并的顺序文件列表,第二个元素是待合并的乱序文件列表
    • mergeFiles长度为0
      • return false
    • mergeResource.clear()
    • mergeResource.setCacheDeviceMeta()
    • 使用任务工厂taskFactory和选中待合并的乱序和顺序文件创建合并任务,并添加到合并任务管理器CompactionTaskManager的任务等待队列里
    • return true


文件选择器ICrossSpaceMergeFileSelector

MaxFileMergeFileSelector

下面详细介绍选择待合并文件的方法:

  1.   select()方法:返回的数组里第一个元素是待合并的顺序文件列表,第二个元素是待合并的乱序文件列表

1)具体流程:在系统预设的时间内(30秒),针对每个乱序文件查找与其Overlap且还未被此次合并任务文件选择器选中的顺序文件列表,每找到一个乱序文件及其对应的Overlap顺序文件列表后就预估他们进行合并可能增加的额外内存开销(优先使用loose估计),若未超过系统给合并线程预设的内存开销,则把他们放入到此合并任务选中的顺序和乱序文件里。更新该虚拟存储组下该时间分区的跨空间合并资源管理器里的顺序文件和乱序文件列表,移除其未被选中的文件的SequenceReader以清缓存。

2)计算合并某一乱序文件与其对应的Overlap的顺序文件会新增的内存开销的方法:

//loose估计出的大小会偏大、误差大;而tight估计出的大小精确到具体合并的有多少序列,误差小

(1)Loose估计:乱序文件占用的大小(即整个乱序文件大小,因为可能会读取出乱序文件里的所有Chunk+顺序文件占用的大小(由于每次只读取一个顺序文件,因此cost记录顺序文件里最大的metadata大小;写入的时候每个文件都会有metadata,因此cost还要把所有顺序文件的metadata加起来)

(2)Tight估计:计算获取查询该乱序文件里concurrentMergeNum个时间序列共要花费的可能最大内存空间,即(总文件大小*MaxChunkNum/totalChunkNum);计算获取查询该顺序文件里concurrentMergeNum个时间序列共要花费的可能最大内存空间,即(总索引区大小*MaxChunkNum序列最多含有的Chunk数量/totalChunkNum所有序列Chunk数量和)

        优先使用loose估计内存的原因是速度快,可是估计结果偏大,误差大,可能导致估计的内存太大超过系统给合并任务预值的内存值,因此该合并任务就没有一个被选中待合并的乱序和对应Overlap的顺序文件。若发生这种情况,再使用tight估计内存来选取待合并文件,它所估计的内存精确到具体要访问几个序列,较精确,可是要访问文件磁盘上每个序列的具体Chunk数量,速度较慢。


2.  select(isTight)方法   

    • 获取系统给合并任务的文件选择预设的时间阈值为30秒·       
    • 遍历每个乱序文件,且当选择文件的耗时还未超过系统预设值时
      • 当被选中待合并的顺序文件的数量 != 所有顺序文件的数量
        • selectOverLappedSeqFiles(unseqFile)选中与该乱序文件Overlap的顺序文件
      • 若该乱序文件或者与其Overlap的某一顺序文件未关闭或正在merging§  清空临时变量,遍历下个乱序文件
      • 计算合并此乱序文件与其对应Overlap的顺序文件列表可能会新增的内存开销
      • 若 合并此乱序文件与其对应Overlap的顺序文件列表可能会新增的内存开销加上原有的其他乱序文件进行合并的开销不会超过系统给合并线程预设的内存开销
        • 把选中的乱序文件和对应Overlap的顺序文件放入全局对象列表里

3.  selectOverLappedSeqFiles(unseqFile)

具体流程:将与该unseqFile乱序文件有Overlap的并且还未被此次合并任务文件选择器选中的顺序文件的索引放入tmpSelectedSeqFiles列表里。具体判断是否有Overlap的做法是:依次遍历获取乱序文件的每个设备ChunkGroup,判断所有还未被此次合并任务选中的顺序文件的该设备ChunkGroup是否有与乱序的ChunkGroup重叠,有的话则选中此顺序文件。

 

MaxSeriesMergeFileSelector


  • No labels