You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 23 Next »


设计原则

减少合并任务提交的入口数量

能够在原始写入的文件大小不一的情况下(如初始写出的 TsFile 大小为 100M,10k,20M),控制合并目标文件大小

将 TsFileManagement 里管理 TsFileResource 的数据结构与 合并机制分离,固定管理 TsFileResource 的数据结构

能够做到时间分区内部合并任务的并行

能够解决以下场景:

  • 场景1:生成海量小文件
  • 场景2:合并速度小于写入速度:10个存储组,每个存储组每天会产生10个文件,每个线程每天能合5个文件,系统最多有10个线程


名词解释

顺序空间:顺序数据文件所在的空间

乱序空间:乱序数据文件所在的空间


顺序空间合并(Seq Compaction):在顺序空间内部进行文件的合并(SizeTired Compaction)

乱序空间合并(Unseq Compaction):在乱序空间内部进行文件的合并(SizeTired Compaction)

跨空间合并(Cross Compaction):将乱序空间的文件合并到顺序空间


合并的总体任务

合并调度任务(定时启动):选择合并文件候选集,并提交合并执行任务

合并执行任务:接收待合并文件列表,合并到目标文件,并删除老文件

合并恢复任务(重启时执行一次):将中断的合并执行任务继续执行


合并调度的参数

enable_seq_space_compaction=true

enable_unseq_space_compaction=true

enable_cross_space_compaction=true (等价之前的 enable_unseq_compaction)

compaction_priority=inner_cross(优先执行空间内合并,快速减少文件数)、cross_inner(优先执行跨文件空间合并,消除乱序文件)、balance(平衡减少文件数和消除乱序文件)

compaction_concurrent_thread=50(合并执行任务的并行度)

compaction_interval=10000(合并调度任务的执行间隔,单位 ms)

compaction_candidate_file_num=100 (候选合并文件个数)

compaction_target_tsfile_size=2000000000 (合并的目标文件大小)

compaction_cross_space_max_select_unseq_file_num=100(跨文件空间合并一次最多提交的乱序文件个数)

TsFileManagement 文件管理器(每个虚拟存储组一个)

Map<Long, 双向链表<TsFileResource>>  seqList:  分区 → 顺序文件列表

Map<Long, 双向链表<TsFileResource>>  unseqList:  分区 → 乱序文件列表


双向链表功能:

  • 管理顺序文件时,按照数据时间戳递增排列
  • 管理乱序文件时,按照文件版本号由低到高排列
  • 移除某些位置的文件,并且在原位加入合并后的文件

VirtualStorageGroupManager → StorageGroupProsessor


StorageGroupProsessor → VirtualStorageGroupProsessor 虚拟存储组

启动定时任务:

  • 倒序遍历时间分区
    • CompactionManager.doCompactionSchedule(TsFileManagement management,long 分区号)


合并管理器 CompactionManager(与 MManager 同级,系统启动时注册)

static volatile boolean isRecovered=false;  (是否恢复完成)

recover():恢复

存储组为之前中断的每个合并执行任务,提交异步合并恢复任务

doCompactionSchedule(TsFileManagement management,long 分区号):执行合并调度任务

如果 isRecovered = true,执行 compactionSchedule();

合并调度任务 compactionSchedule

static  long  currentTaskNum=0;// 当前进行的合并任务个数

static compactionSchedule(TsFileManagement tsfileManagement, long 分区号):

  • if: currentTaskNum >= compaction_concurrent_thread: return;
  • else:
  • tsfileManagement.readLock(resourceList)
  • 顺序文件链表=tsfileManagement.getSeqList(分区号)
  • 乱序文件链表=tsfileManagement.getUnSeqList(分区号)
    • 如果 compaction_priority=balance
      • doCompactionInBalancePriority(); 
    • 如果 compaction_priority=inner_cross
      • doCompactionInInnerSpacePriority();
    • 如果 compaction_priority=cross_inner
      • doCompactionInCrossSpacePriority();
  • tsfileManagement.readUnLock(resourceList);


doCompactionInBalancePriority

  • hasSelected = true
  • while (hasSelected && currentTaskNum < compaction_concurrent_thread)
    • hasSelected = selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
    • hasSelected |= selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
    • hasSelected |= selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)


doCompactionInInnerSpacePriority

  • selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
  • selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
  • selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)


doCompactionInCrossSpacePriority

  • selectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)
  • selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
  • selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)


selectAndSubmitInSpaceFiles(双向文件链表, isSeq): 返回是否选中文件做合并

  • hasSelect = false
  • 候选文件列表 Flist=[]
  • 倒序遍历文件列表
    • 如果  (任务已经足够多:currentTaskNum >= compaction_concurrent_thread)
            || (无需执行顺序合并: isSeq && !enable_seq_space_compaction)
            || (无需执行乱序合并: !isSeq && !enable_unseq_space_compaction)
      • return hasSelected
    • 如果(当前文件大小 > target_tsfile_size || 当前文件正在合并 || 当前文件未关闭)
      • 清空 FList;
      • continue;
    • 将当前文件加入 FList
    • 如果 FList 内所有文件总大小大于 compaction_target_tsfile_size || FList 内文件数达到 compaction_candidate_file_num
      • 将 FList 拷贝提交异步任务(空间内合并执行流程), FList=[],++currentTaskNum, hasSelected=true
      • 如果 compaction_priority=balance
        • break;
  • return hasSelected



选出来的顺序文件:已关闭、未正在合并、至少与一个选出的乱序文件重叠

选出来的乱序文件:已关闭、未正在合并、所有与此乱序文件重叠的顺序文件(至少一个)都被选中


selectAndSubmitCrossSpaceFiles(顺序文件链表 S, 乱序文件链表 U): 返回是否选中文件做合并

  • hasSelected = false
  • 如果 currentTaskNum >= compaction_concurrent_thread
           || !enable_cross_space_compaction
    • return hasSelected;
  • 候选乱序文件 UF = []
  • 候选顺序文件 SF = []
  • hasSelected = false
  • for 乱序文件 u in  乱序文件链表 U
    • if (u 未关闭 || u 正在合并)
      • if submitIfNotEmpty(SF, UF): 
        • 如果 compaction_priority=balance, break;
        • if ( 线程池满 ) break;
      • continue;
    • 临时选中顺序文件 TmpSF = [] 
    • continueToNextUnseqFile = false
    • for 顺序文件 s in 顺序文件链表 S
      • if isOverlap(u, s)
        • if s 未关闭 || s 正在合并
          • if submitIfNotEmpty(SF, UF): 
            • 如果 compaction_priority=balance, return hasSelected;
          • continueToNextUnseqFile = true;  break
        • else
          • TmpSF.add(s)
    • if continueToNextUnseqFile
      • continue
    • SF.addAll(tmpSF), UF.add(u)
    • if UF.size() >= compaction_cross_space_max_select_unseq_file_num
      • if submitIfNotEmpty(SF, UF)
        • if compaction_priority=balance, break
        • if ( 线程池满 ) break;
  • submitIfNotEmpty(SF, UF)
  • return hasSelected


合并执行任务

空间内合并执行流程

跨文件空间合并执行流程


合并恢复任务

顺序空间合并执行流程

乱序空间合并执行流程

跨文件空间合并执行流程



  • No labels