设计原则
控制合并任务提交的入口数量
能够在原始写入的文件大小不一的情况下,控制合并目标文件大小(如初始写出的 TsFile 大小为 100M,10k,20M)
将 TsFileManagement 里管理 TsFileResource 的数据结构与合并机制分离,固定管理 TsFileResource 的数据结构
能够做到时间分区内部合并任务的并行
能够解决以下场景:
- 场景1:生成海量小文件
- 场景2:合并速度小于写入速度:10个存储组,每个存储组每天会产生10个文件,每个线程每天能合5个文件,系统最多有10个线程
名词解释
顺序空间:顺序数据文件所在的空间
乱序空间:乱序数据文件所在的空间
顺序合并(Seq Compaction):在顺序空间内部进行文件的合并(SizeTired Compaction)
乱序合并(Unseq Compaction):在乱序空间内部进行文件的合并(SizeTired Compaction)
顺乱序合并(Cross Compaction):将乱序空间的文件合并到顺序空间
合并的总体任务
合并调度任务(定时启动):选择合并文件候选集,并提交合并执行任务
合并执行任务:接收待合并文件列表,合并到目标文件,并删除老文件
合并恢复任务(重启时执行一次):将中断的合并执行任务继续执行
合并的参数
enable_seq_space_compaction=true
enable_unseq_space_compaction=true
enable_cross_space_compaction=true (等价之前的 enable_unseq_compaction)
compaction_priority=inner_cross(优先执行空间内合并,快速减少文件数)、cross_inner(优先执行跨文件空间合并,消除乱序文件)、balance(平衡减少文件数和消除乱序文件)
concurrent_compaction_thread=50(合并执行任务的并行度)
compaction_interval=10000(定时合并任务的提交间隔,单位 ms)
TsFileManagement 文件管理器(每个虚拟存储组一个)
Map<Long, 双向链表<TsFileResource>> seqList: 分区 → 顺序文件列表
Map<Long, 双向链表<TsFileResource>> unseqList: 分区 → 乱序文件列表
双向链表功能:
- 管理顺序文件时,按照数据时间戳递增排列
- 管理乱序文件时,按照文件版本号由低到高排列
- 移除某些位置的文件,并且在原位加入合并后的文件
VirtualStorageGroupManager → StorageGroupProsessor
StorageGroupProsessor → VirtualStorageGroupProsessor 虚拟存储组
启动定时任务:
- 倒序遍历时间分区
- CompactionManager.compactionSchedule(TsFileManagement management)
全局合并管理器(与 MManager 同级,系统启动时注册)
static volatile boolean isRecovered=false; (是否恢复完成)
recover():恢复
存储组为之前中断的每个合并执行任务,提交异步合并恢复任务
doCompaction():执行合并调度任务
如果 isRecovered = true,执行 compactionSchedule();
合并调度任务
static long currentTaskNum=0;// 当前进行的合并任务个数
static compactionSchedule(TsFileManagement tsfileManagement):
- tsfileManagement.readLock(resourceList)
- 如果 currentTaskNum < concurrent_compaction_thread
- 如果 compaction_priority=inner_cross || balance
- selectInSpaceFiles(顺序文件链表,true)
- selectInSpaceFiles(乱序文件链表,false)
- crossSpaceCompaction();
- 如果 compaction_priority=cross_inner
- crossSpaceCompaction();
- selectInSpaceFiles(顺序文件链表,true)
- selectInSpaceFiles(乱序文件链表,false)
- 如果 compaction_priority=inner_cross || balance
- 如果 currentTaskNum < concurrent_compaction_thread
- tsfileManagement.readUnLock(resourceList);
selectInSpaceFiles(双向文件链表, isSeq)
- 候选文件列表 Flist=[]
- 倒序遍历文件列表
- 如果 currentTaskNum >= concurrent_compaction_thread || (isSeq && !enable_seq_space_compaction) || (!isSeq && !enable_unseq_space_compaction) return
- 如果(当前文件大小大于 target_tsfile_size || 当前文件正在合并 || 当前文件未关闭),则清空 FList,continue;
- 将当前文件加入 FList
- 如果 FList 内所有文件总大小大于 target_tsfile_size
- 将 FList 提交异步任务(空间内合并执行流程), FList=[],++currentTaskNum
- 如果 compaction_priority=balance
- break;
crossSpaceCompaction()
- 如果 currentTaskNum >= concurrent_compaction_thread || !enable_cross_space_compaction return;
- 选择跨文件空间合并候选文件 F
- 将 F 提交异步任务(跨文件空间合并执行流程)
- ++currentTaskNum
- 如果 compaction_priority=balance, break;
合并执行任务
空间内合并执行流程
跨文件空间合并执行流程
合并恢复任务
顺序空间合并执行流程
乱序空间合并执行流程
跨文件空间合并执行流程