Table of Contents |
---|
设计原则
能够在原始写入的文件大小不一的情况下(如初始写出的 TsFile 大小为 100M,10k,20M),控制合并目标文件大小和 Chunk 大小
将 TsFileManagement 里管理 TsFileResource 的数据结构与 合并机制分离
能够做到时间分区内部合并任务的并行
能够解决以下场景:
- 场景1:生成海量小文件
- 场景2:合并速度小于写入速度:10个存储组,每个存储组每天会产生10个文件,每个线程每天能合5个文件,系统最多有10个线程
名词解释
顺序空间:顺序数据文件所在的空间
乱序空间:乱序数据文件所在的空间
整体流程
合并任务提交
- 如果 compactionMergeWorking 变量为true,则说明上一次合并还在进行,跳过本次合并
- 如果 compactionMergeWorking 变量为false,说明本存储组没有合并任务在进行
- 设置 compactionMergeWorking 为true
- 拷贝当前文件列表镜像,见算法1
- 设置如果进行乱序合并,是否进行 FullMerge
- 提交合并任务
算法1
- 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
- 遍历上述的time-value列表,将数据写入新的ChunkWriter
- 判断限流
- 将ChunkWriter写入新文件
FullMerge(完全合并)
出于IO和合并性能考虑,乱序合并不会重写整个顺序文件,获取一个整理完全的顺序文件,而是会将与乱序文件重叠的 chunk 合并重写后,追加到原顺序文件后面,不删除原chunk,称这个过程为 原地合并
但是过多的 原地合并 会造成无用的数据越来越多,甚至在某些情况下导致一个文件大小无限制地增大(见线上问题分析)
而 完全合并 则是在乱序合并时重写整个顺序文件,IO和合并性能较低,但能获取一个整理完全的顺序文件
合并任务执行
- 根据 seq_file_num_in_each_level / unseq_file_num_in_each_level 获取此次待合并层级中的 TsFileResource,作为待合并文件列表
- 对待合并文件列表中的每个文件,创建一个 TsFIleSequenceReader,并存在 ReaderCache 中,并获取该文件的设备列表,生成所有的 device 集合
- 遍历device集合
- 根据 ReaderCache 得到相应的TsFileSequenceReader
- 根据 device 读取各个文件对应的ChunkMetadataListIterator
- 循环算法2输出的待合并的 IMeasurementSchema 列表
- 对于待合并的 IMeasurementSchema
- 如果是乱序的文件合并,采取 存在对齐时间序列的反序列化 Page 合并算法
- 如果是顺序空间的文件合并
- 如果某个 Chunk(如果是对齐时间序列,判断 TimeChunk) 的数据点数小于 merge_page_point_number,采取 存在对齐时间序列的反序列化 Page 合并算法
- 如果page足够大,采取 存在对齐时间序列的追加 Page 合并算法
- 关闭 ReaderCache 中所有的reader
- 序列化新文件的 TsFileResource
- 关闭新文件writer
算法2
输入:多个文件的 ChunkMetadataListIterator,Iterator 每次输出的 List 内的 sensor 个数(包含对齐时间序列内的序列数,不包含 time)为 max_degree_of_index_node(由于对齐时间序列是个整体,因此可能会超过部分)
输出:每轮待合并的 sensor 列表
描述:每个迭代器取1个 List,找到每个 List 的最大字典序(对齐时间序列按 time 的名字 $#$id 来比较)的 sensor,组成集合 S,本次合并从头开始合并到 S 中最小字典序的 sensor 。并从 List 中清除已合并的 sensor,如果此 List 消耗完,获取下一个 Lst
优势:每次至少会消耗完一个 file 的一个 List
Iterator 每次输出的 List 的例子:文件内的序列为:(time, s4,s5,s6), (time, s9, s10), s1,s2,s3,s7,s8
如果 max_degree_of_index_node 为 2, 每次取出的为:(time, s4,s5,s6); (time, s9, s10); s1,s2; s3,s7; s8
如果 max_degree_of_index_node 为 3,每次取出的为:(time, s4,s5,s6); (time, s9, s10), s1; s2, s3,s7; s8
如果 max_degree_of_index_node 为 4,每次取出的为:(time, s4,s5,s6), (time, s9, s10); s1 s2, s3,s7; s8
存在对齐时间序列的反序列化 Page 合并算法
- 通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
- 如果是MeasurementSchema,使用ChunkReader
- 如果是VectorMeasurementSchema,使用VectorChunkReader
- 遍历上述的time-value列表,将数据写入新的IChunkWriter
- 如果是MeasurementSchema,使用ChunkWriterImpl
- 如果是VectorMeasurementSchema,使用VectorChunkWriterImpl
- 判断限流
- 将IChunkWriter写入新文件
存在对齐时间序列的追加 Page 合并算法
- 通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新Chunk和ChunkMetadata
- 如果是MeasurementSchema,仅需要前一个chunk和后一个chunk进行合并
- 如果是VectorMeasurementSchema,需要前一个VectorChunkMetadata对应的所有timeChunk和valueChunk与后一个所有的timeChunk和valueChunk合并
- 判断限流
- 将合并完的Chunk和ChunkMetadata写入新文件
例子:
VectorChunk1: timeChunk(page1,page2) s1Chunk(page3, page4) s2Chunk(page5)
VectorChunk2: timeChunk(page6) s1Chunk(page7) s2Chunk(page8)
合并后: timeChunk(page1, page2, page6) s1Chunk(page3, page4, page7), s2Chunk(page5, page8)
直接追加chunk合并
原流程存在的问题
对于层级合并来说(以顺序空间的层级合并为例),是通过seq_level_num和seq_file_num_in_each_level这两个参数来控制合并到最后的chunk大小的,即把原来的chunk扩大seq_file_num_in_each_levelseq_level_num-1倍,这种配置方案存在如下三个问题:
1、在配置时需要知道用户原先写出的一个chunk的大小,并根据实际经验来配置。这样的配置方式对用户来说难以自己上手配置,对于每一个用户来说,也都需要我们去观察文件并辅助配置,非常不方便
2、一旦用户使用了默认参数没有修改或者配置错误,而用户本身场景又不需要合并太多次文件,那么就会占用很多无效的磁盘IO,甚至降低查询效率
3、在一些场景中,不同时间序列的写入速度是不一样的,那么按照这种合并方式,最后会合并出很多大小不一的chunk,写入速度慢的时间序列对应的chunk小,写入速度快的时间序列对应的chunk大,造成chunk大小不均匀,难以控制
例子:
假设用户需要的目标chunk是4倍大小的原始chunk,有如下文件
0层:file1(s1,s2,s2)+file2(s2,s2,s2) 即file1有1个s1的chunk,2个s2的chunk,file2有3个s2的chunk
合并至1层:file3(s1(1),s2(5)) 合并出来的文件有1个s1的chunk,1个5倍大小的s2的chunk,假设他与另一个file4(s1(1),s2(5))进行合并
合并至2层:file5(s1(2),s2(9)) 合并出来的文件有1个2倍大小的s1的chunk,1个9倍大小的s2的chunk
可以看到s2的chunk越合并越大,已经远远超过用户需要的chunk大小,s1对应的chunk却仍没有达到用户的要求
解决方案
使用配置merge_chunk_point_number_threshold在合并每一个chunk的时候控制,
如果在待合并列表中这个sensor对应的所有chunk都已经达到了这个阈值,则不再合并chunk,直接将读出来的chunk写入新文件
跨文件空间合并(将乱序文件合并至顺序空间)
不存在对齐时间序列的消除乱序文件合并流程
输入一组乱序文件U,以及一组顺序文件S,最大同时合并的时间序列数量为n,单个chunk的点数阈值tpt,是否进行Full Merge
...
顺序空间合并(Seq Compaction):在顺序空间内部进行文件的合并(SizeTired Compaction)
乱序空间合并(Unseq Compaction):在乱序空间内部进行文件的合并(SizeTired Compaction)
跨空间合并(Cross Compaction):将乱序空间的文件合并到顺序空间
合并的总体任务
合并调度任务(定时启动):选择合并文件候选集,并提交合并执行任务
合并执行任务:接收待合并文件列表,合并到目标文件,并删除老文件
合并恢复任务(重启时执行一次):将中断的合并执行任务继续执行
合并调度的参数
enable_seq_space_compaction=true
enable_unseq_space_compaction=true
enable_cross_space_compaction=true (等价之前的 enable_unseq_compaction)
compaction_priority=inner_cross(优先执行空间内合并,快速减少文件数)、cross_inner(优先执行跨文件空间合并,消除乱序文件)、balance(平衡减少文件数和消除乱序文件)
compaction_concurrent_thread=50(合并执行任务的并行度)
compaction_interval=10000(合并调度任务的执行间隔,单位 ms)
compaction_candidate_file_num=100 (候选合并文件个数)
compaction_target_tsfile_size=2000000000 (合并的目标文件大小)
compaction_cross_space_max_select_unseq_file_num=100(跨文件空间合并一次最多提交的乱序文件个数)
TsFileManagement 文件管理器(每个虚拟存储组一个)
Map<Long, 双向链表<TsFileResource>> seqList: 分区 → 顺序文件列表
Map<Long, 双向链表<TsFileResource>> unseqList: 分区 → 乱序文件列表
双向链表功能:
- 管理顺序文件时,按照数据时间戳递增排列
- 管理乱序文件时,按照文件版本号由低到高排列
- 移除某些位置的文件,并且在原位加入合并后的文件
待改名:
VirtualStorageGroupManager → StorageGroupProsessor
StorageGroupProsessor → VirtualStorageGroupProsessor 虚拟存储组
合并的整体流程
合并任务管理器CompactionTaskManager
该类负责管理合并任务的运行,例如把合并任务加入到等待队列里,并定时从等待队列里拿取高优先级的线程进行执行。
- 重要属性
- candidateCompactionTaskQueue:合并线程的等待队列,可容纳的任务数量为1000个,若满了则会踢掉低优先级的线程
- 两个线程池
- taskExecutionPool:合并任务的执行线程池,系统预设的线程数量为10个
- compactionTaskSubmissionThreadPool:定时执行线程池,该池里的线程负责定时从等待队列里拿取一个合并任务进行执行。系统预设的线程数量为1个。
- runningCompactionTaskList:正在执行合并的任务列表
- currentTaskNum:当前正在执行合并任务的线程数量
- Task_Submit_Interval:每个合并线程执行的时间间隔,系统预设值为1秒
- 合并任务的优先级
candidateCompactionTaskQueue队列里,任务是根据自定义的优先级进行排序的,优先级高的任务会被优先执行。当该队列满时,若添加新的合并任务,则会把优先级最低的给踢掉。下面讲解合并任务的优先级定义规则,在CompactionTaskComparator类里:
- 若两个任务一个是空间内合并,另一个是跨空间合并,且系统预设的合并优先级不是balance
- 若系统预设的合并执行优先级是inner_cross,则空间内合并的任务的优先级较高
- 若系统预设的合并执行优先级是cross_inner,则跨空间合并的任务的优先级较高
- 若两个任务都是空间内合并
顺序空间内合并 > 乱序空间内合并
优先合并【低层文件(空间内合并次数低的)】:若两个任务的平均每个待合并文件的空间内合并次数不相等,则平均每个文件空间内合并次数低的任务的优先级较高(因为这样可以减少写放大)
优先合并【文件数量多的】:可以减少大量文件
优先合并【待合并文件总大小小的】:(因为小文件的合并速度较快)
优先合并【新文件】:若两个任务的待合并文件的最大version不同,则较大version的任务的优先级高(因为我们想要优先合并最近落盘的文件)
- 若两个任务都是跨空间合并
优先合并【顺序文件少的】:若两个任务的待合并的顺序文件的数量不一样,则顺序文件数量较少的任务的优先级高(因为在合并过程中占用更少的内存)
乱序文件数量较多的任务的优先级较高(因为会减少更多的乱序文件)
- 若两个任务一个是空间内合并,另一个是跨空间合并,且系统预设的合并优先级不是balance
合并任务调度器CompactionSchedule
该类用于根据系统预设的合并优先级策略去创建对应的文件选择器,并使用文件选择器去选择待合并的一批批TsFile并为每一批文件创建一个合并任务(空间内合并、跨空间合并)线程放进CompactionTaskManager的等待队列里。
- 方法详解
1) static scheduleCompaction (TsFileManagement tsfileManagement, long 分区号):
- readLock(resourceList)
- 顺序文件链表=tsfileManagement.getSeqList(分区号)
- 乱序文件链表=tsfileManagement.getUnSeqList(分区号)
- 如果 compaction_priority=balance
- doCompactionBalancePriority();
- 如果 compaction_priority=inner_cross
- doCompactionInnerCrossPriority();
- 如果compaction_priority=cross_inner
- doCompactionCrossInnerPriority();
- readUnLock(resourceList);
- 如果 compaction_priority=balance
2) doCompactionBalancePriority
- taskSubmitted = true
- while (taskSubmitted && currentTaskNum < concurrentCompactionThread)
- taskSubmitted = tryToSubmitInnerSpaceCompactionTask(顺序文件链表,isSeq=true,…)
- hasSelected |= tryToSubmitInnerSpaceCompactionTask (乱序文件链表,isSeq=false,…)
- hasSelected |= tryToSubmitCrossSpaceCompactionTask (顺序文件链表, 乱序文件链表,…)
3) doCompactionInnerCrossPriority
- tryToSubmitInnerSpaceCompactionTask(顺序文件链表,isSeq=true,…)
- tryToSubmitInnerSpaceCompactionTask (乱序文件链表,isSeq=false,…)
- tryToSubmitCrossSpaceCompactionTask (顺序文件链表, 乱序文件链表,…)
4) doCompactionCrossInnerPriority
- tryToSubmitCrossSpaceCompactionTask (顺序文件链表, 乱序文件链表,…)
- tryToSubmitInnerSpaceCompactionTask(顺序文件链表,isSeq=true,…)
- tryToSubmitInnerSpaceCompactionTask (乱序文件链表,isSeq=false,…)
- tryToSubmitInnerSpaceCompactionTask (文件链表, isSeq, …): 返回是否选中文件做合并
- if ((无需执行顺序合并: isSeq && !enable_seq_space_compaction) || (无需执行乱序合并: !isSeq && !enable_unseq_space_compaction))
- return false
- 获取空间内合并的文件选择器AbstractInnerSpaceCompactionSelector
- return innerSpaceCompactionSelector.selectAndSubmit()
- tryToSubmitCorssSpaceCompactionTask (顺序文件链表, 乱序文件链表, …): 返回是否选中文件做合并
- if (无需跨空间合并:!enable_cross_space_compaction)
- return false
- 获取跨空间合并的文件选择器AbstractCrossSpaceCompactionSelector
- return crossSpaceCompactionSelector.selectAndSubmit()
合并任务的文件选择器AbstractCompactionSelector
根据一定的策略选择待被合并的一批批TsFile文件,并为每批TsFile文件创建一个合并任务放入合并任务管理器CompactionTaskManager的任务等待队列里。
空间内合并是从低层向高层搜索某层若满足连续10个文件或文件总大小超过2G则将该批文件封装到一个合并任务里,因此一次搜索某层可能出现好几批待合并文件,分别封装成好几个合并任务;跨空间合并则是针对该虚拟存储组下该时间分区下的所有乱序文件与其有Overlap的顺序文件都封装在一个合并任务里,具体判断是否有Overlap的方法是:遍历每个乱序文件里的每个ChunkGroup,在将该乱序ChunkGroup的结束时间依次与每个顺序文件的该设备ChunkGroup的结束时间做比较,若小于则说明该乱序文件与当前遍历到的顺序文件有Overlap。
无论是空间内还是跨空间合并在选择文件的时候都没有读取对应的.mods文件判断数据是否被删除。
空间内合并的文件选择器AbstractInnerSpaceCompactionSelector
- 重要属性
- tsFileResources:文件链表,存放着该虚拟存储组下的该时间分区下的所有顺序或者乱序文件。
- sequence:顺序or乱序
- taskFactory:任务工厂,负责为每批待合并的TsFile创建一个合并任务
SizeTieredCompactionSelector选择器
- 重要方法
1) selectAndSubmit():返回是否找到文件并提交合并任务
//该方法从0层向最高层,逐层搜索是否有待被合并的一批批文件,若某层上有一批以上的待合并文件,则不再向高层搜索,并且为每批文件创建合并线程,并放入合并任务管理器CompactionTaskManager的任务等待队列里
- readLock();
- 创建任务优先级队列taskPriorityQueue;//负责存放每批待合并文件的TsFileResource列表和文件的总大小
- 获取所有的顺序或者乱序文件里空间内合并的最大层数maxLevel=searchFileMaxLevel();
- 从第0层开始向最高层,针对每一层执行如下文件搜索
- if(!selectLevelTask(currentLevel,taskPriorityQueue))//若当前层搜索到一批以上待合并的文件,则把他们的TsFileResource列表和总文件大小放入taskPriorityQueue队列里,且不继续向高层搜索文件
- break
- 遍历taskPriorityQueue里每个元素
- 使用taskFactory对任务优先级队列里的该批文件创建合并线程,并放入合并任务管理器CompactionTaskManager的任务等待队列里
2) selectLevelTask(currentLevel,taskPriorityQueue) :返回是否要向高层level继续搜索
//该方法搜索第level层上的所有文件,若该层上有连续的文件满足系统预设的条件(数量超过10个或者总文件大小超过2G)则为该批文件创建一个合并任务放进taskPriorityQueue队列里,并继续搜索下一批。若在该层上搜索到至少一批以上待合并文件,则返回false(示意不再向高层搜索),否则返回true。
- 创建临时选中的文件列表selectedFileList和选中的文件总大小selectedFileSize
- 创建Boolean变量shouldContinueToSearch=true。//当在该层找到了一批以上的待合并的文件后则会把该变量设为false,示意文件选择器不再往高层
- 依次遍历该虚拟存储组的该时间分区下的所有顺序文件,
- 若该顺序文件的空间内合并层级不等于currentLevel
- 清空selectedFileList和selectedFileSize,遍历下一个顺序文件
- 否则该顺序文件的空间内合并层级等于currentLeve,则把他加入到selectedFileList,并且selectedFileSize加上该文件大小
- 若 (被选中的文件数量>=系统预设值为10个) || (被选中的文件的总大小>=系统预设值为2GB)
- 把该批选中的文件列表及其总文件大小放入taskPriorityQueue队列里。并清空selectedFileList和selectedFileSize,将shouldContinueToSearch设为false
- 创建临时选中的文件列表selectedFileList和选中的文件总大小selectedFileSize
- 若该顺序文件的空间内合并层级不等于currentLevel
跨空间合并的文件选择器AbstractCrossSpaceCompactionSelector
- 重要属性
- sequenceFileResources:顺序文件链表,存放着该虚拟存储组下的该时间分区下的所有顺序文件。
- unsequenceFileResources:乱序文件链表,存放着该虚拟存储组下的该时间分区下的所有乱序文件。
- taskFactory:任务工厂,负责为每批待合并的TsFile创建一个合并任务
- 主要流程
- 从虚拟存储组下该时间分区里的所有顺序和乱序文件中,选取“候选文件”:
- 候选顺序文件:过滤掉isNotExist、isDelete、isOverTTL的文件。
- 候选乱序文件:从第一个文件开始
- 若遇到isNotExist || isDelete || isOverTTL 的则跳过
- 若遇到 isCompacting || isCompactionCandidate || isNotClosed 则返回
- 否则添加到候选乱序文件中
- 在指定的时间内(30s)选取“源文件”,即从候选文件中选取与乱序文件有重叠的顺序文件:
- 遍历乱序文件,寻找与该乱序文件有重叠的所有顺序文件:(使用loose评估内存)
- 若顺序文件中不存在isCompacting、isCompactionCandidate的,则检查并用评估它们在合并中的内存开销是否超过阈值,若否,则将该批乱序和对应的顺序文件放入源文件中
- 若顺序文件中存在一个及以上isCompacting、isCompactionCandidate的,则丢弃该乱序和对应的顺序文件,并停止选取源文件。
- 若选取的乱序文件数量为0,则使用tight评估重新选取源文件
- 遍历乱序文件,寻找与该乱序文件有重叠的所有顺序文件:(使用loose评估内存)
- 若源顺序文件或者源乱序文件列表中有一个为0,则放弃此合并任务。否则对所有的源文件set isCompactionCandidate为true,并把该任务丢进等待队列里。
RewriteCrossSpaceCompactionSelector选择器
- 重要方法
1)selectAndSubmit():返回是否找到文件并提交合并任务
//该方法根据跨空间合并的文件选择策略创建具体的文件选择器,使用文件选择器选取该虚拟存储组下该时间分区下的所有待被合并的乱序文件和顺序文件,并为他们创建一个合并任务放入合并任务管理器CompactionTaskManager的任务等待队列里
- 若 (当前正在合并的线程数量>=系统预设的数量为10)||(系统预设为不允许跨空间合并:!enableCrossSpaceCompaction)||(当前虚拟存储组的当前时间分区存在合并任务) //有个基类,跨空间和空间内合并的文件选择器的基类
- return false
- 若该存储组的该时间分区下不存在顺序文件或乱序文件
- return false //注意:当只有乱序文件而没有顺序文件时,则应该移动或者重写此乱序文件(可能顺序文件的数据被删掉了,而乱序文件还存在)
- 若 (当前正在合并的线程数量>=系统预设的数量为10)||(系统预设为不允许跨空间合并:!enableCrossSpaceCompaction)||(当前虚拟存储组的当前时间分区存在合并任务) //有个基类,跨空间和空间内合并的文件选择器的基类
- 若乱序文件的数量>系统预设的合并任务最大的文件数量为10
- 截取前几个乱序文件,数量为maxCompactionCandidateFileNum
- 创建跨空间合并的资源管理器mergeResource
- 根据系统预设的跨空间合并的文件选择策略(Max_File_Num或者Max_Series_Num)创建具体的文件选择器ICrossSpaceMergeFileSelector
- crossSpaceMergeFileSelector.select(),使用具体的文件选择器选取所有的待被合并的乱序文件和顺序文件,装入mergeFiles数组里,第一个元素是待合并的顺序文件列表,第二个元素是待合并的乱序文件列表
- 若mergeFiles长度为0
- return false
- mergeResource.clear()
- mergeResource.setCacheDeviceMeta()
- 使用任务工厂taskFactory和选中待合并的乱序和顺序文件创建合并任务,并添加到合并任务管理器CompactionTaskManager的任务等待队列里
- return true
- 若乱序文件的数量>系统预设的合并任务最大的文件数量为10
文件选择器ICrossSpaceMergeFileSelector
RewriteCompactionFileSelector
下面详细介绍选择待合并文件的方法:
- select()方法:返回的数组里第一个元素是待合并的顺序文件列表,第二个元素是待合并的乱序文件列表
1)具体流程:在系统预设的时间内(30秒),针对每个乱序文件查找与其Overlap且还未被此次合并任务文件选择器选中的顺序文件列表,每找到一个乱序文件及其对应的Overlap顺序文件列表后就预估他们进行合并可能增加的额外内存开销(优先使用loose估计),若未超过系统给合并线程预设的内存开销,则把他们放入到此合并任务选中的顺序和乱序文件里。更新该虚拟存储组下该时间分区的跨空间合并资源管理器里的顺序文件和乱序文件列表,移除其未被选中的文件的SequenceReader以清缓存。
2)计算合并某一乱序文件与其对应的Overlap的顺序文件会新增的内存开销的方法:
//loose估计出的大小会偏大、误差大;而tight估计出的大小精确到具体合并的有多少序列,误差小
(1)Loose估计:乱序文件占用的大小(即整个乱序文件大小,因为可能会读取出乱序文件里的所有Chunk)+顺序文件占用的大小(由于每次只读取一个顺序文件,因此cost记录顺序文件里最大的metadata大小;写入的时候每个文件都会有metadata,因此cost还要把所有顺序文件的metadata加起来)
(2)Tight估计:计算获取查询该乱序文件里concurrentMergeNum个时间序列共要花费的可能最大内存空间,即(总文件大小*MaxChunkNum/totalChunkNum);计算获取查询该顺序文件里concurrentMergeNum个时间序列共要花费的可能最大内存空间,即(总索引区大小*MaxChunkNum序列最多含有的Chunk数量/totalChunkNum所有序列Chunk数量和)
优先使用loose估计内存的原因是速度快,可是估计结果偏大,误差大,可能导致估计的内存太大超过系统给合并任务预值的内存值,因此该合并任务就没有一个被选中待合并的乱序和对应Overlap的顺序文件。若发生这种情况,再使用tight估计内存来选取待合并文件,它所估计的内存精确到具体要访问几个序列,较精确,可是要访问文件磁盘上每个序列的具体Chunk数量,速度较慢。
2. select(isTight)方法
- 获取系统给合并任务的文件选择预设的时间阈值为30秒·
- 遍历每个乱序文件,且当选择文件的耗时还未超过系统预设值时
- 当被选中待合并的顺序文件的数量 != 所有顺序文件的数量
- selectOverLappedSeqFiles(unseqFile)选中与该乱序文件Overlap的顺序文件
- 若该乱序文件或者与其Overlap的某一顺序文件未关闭或正在merging§ 清空临时变量,遍历下个乱序文件
- 计算合并此乱序文件与其对应Overlap的顺序文件列表可能会新增的内存开销
- 若 合并此乱序文件与其对应Overlap的顺序文件列表可能会新增的内存开销加上原有的其他乱序文件进行合并的开销不会超过系统给合并线程预设的内存开销
- 把选中的乱序文件和对应Overlap的顺序文件放入全局对象列表里
- 当被选中待合并的顺序文件的数量 != 所有顺序文件的数量
3. selectOverLappedSeqFiles(unseqFile)
具体流程:将与该unseqFile乱序文件有Overlap的并且还未被此次合并任务文件选择器选中的顺序文件的索引放入tmpSelectedSeqFiles列表里。具体判断是否有Overlap的做法是:依次遍历获取乱序文件的每个设备ChunkGroup,判断所有还未被此次合并任务选中的顺序文件的该设备ChunkGroup是否有与乱序的ChunkGroup重叠,有的话则选中此顺序文件。
合并任务的恢复
在IOTDB-Server重启后,每个StorageGroup会执行顺序空间和乱序空间内的合并恢复和跨空间的合并恢复。
在StorageGroupProcessor类里:
1)空间内合并恢复
recoverInnerSpaceCompaction(isSeq)方法:对该存储组下的每个时间分区里的每个合并日志创建一个空间内合并恢复线程SizeTieredCompactionRecoverTask,并同步执行恢复流程。
2)跨空间合并恢复
recoverCrossSpaceCompaction()方法:对该存储组下的每个时间分区里的每个合并日志创建一个空间内合并恢复线程RewriteCrossCompactionRecoverTask,并同步执行恢复流程。
合并任务的定时执行
当server启动后,每个虚拟存储组首先会执行顺序空间和乱序空间内的合并恢复和跨空间的合并恢复,执行完后就会回调submitTimedCompactionTask()方法以定时执行合并任务,具体操作如下:
(1) 从该虚拟存储组下的每个时间分区里依次使用合并任务调度器CompactionSchedule根据合并策略去选择一批批待合并文件,并为每批文件创建一个合并任务线程放进CompactionTaskManager里的线程等待队列里
(2) 从等待队列里获取一个合并线程并执行
合并的加锁流程
- 选择待合并的源文件,并设置setCompactionCandidate(true),将它们封装入一个合并任务里,并放入等待队列中。
- 从等待队列里拿出来,依次对每个源文件加读锁,并检查该源文件是否Valid,若isValid则setCompacting(true),否则释放所有源文件的读锁并setCompacting(false)
- 执行合并,并将临时目标文件移动成最终目标.tsfile文件,生成.resource文件,合并compactionMods文件,更新内存:
- 给TsFileManager加写锁
- 更新 TsFileResourceManager,使用Sychronized移除源文件的 TsFileResource 并增加目标文件的 TsFileResource
- 更新TsFileManager的TsFileResourceList,移除源文件的tsfileResource,插入目标文件的tsFileResource
- 给TsFileManager释放写锁
- 依次对每个源文件释放读锁、加写锁,删除源文件和日志。
- 对每个源文件释放写锁、setCompacting(false)、setCompactionCandidate(false).
...
7.1. 开启一个新的ChunkGroup;
7.2. 对于Tc中的每一条时间序列tsi,在S上查询它们的Chunk,并将这些Chunk按照在文件中的位置排序。
7.3. 取出位置最小的Chunk,记做c,如果已经没有Chunk,转7.4
7.3.1 找到c对应的MergeReader ri,如果ri当前的数据点的时间小于等于c的结束时间tend,将ri所有时间不大于tend的点和c合并,并将c写入到wi,转7.3.5;
7.3.2 如果c的点数小于tpt或者c的上一个Chunk已经写入到wi,但是wi还没有进行flush,将c写入到wi,转7.3.5;
7.3.3 如果进行Full Merge,将c写入到wi,转7.3.5;
7.3.4 记录这个没有被merge的Chunk,转7.3;
7.3.5 如果wi中已写入的点大于等于tpt,将wi flush到s'j,转7.3;
7.4 关闭这个ChunkGroup;
...
10.1 如果si中标记已被合并的chunk占总chunk的比例大于某值threshold:
10.1.0 在merge.log中记录“{si'} start {si'的当前长度}”
10.1.1 将si中未标记已被合并的chunk写入si';
10.1.2 为si'生成FileMetadata并写入到si'尾部;
10.1.3 等待对si的所有查询结束,并对si加锁;
10.1.4用si'替代si;
10.1.5 在merge.log中记录“{si} end”, 对si解锁;
10.2 否则:
10.2.1 等待对si的所有查询结束,并对si加锁;
10.2.2 将si的尾部的FileMetadata截去;
10.2.3 在merge.log中记录“{si} {si的当前长度}”
10.2.4 将si'中的数据写到si尾部;
10.2.5 为si生成FileMetadata并写入到si尾部,该FileMetadata中不包含被标记已经合并的chunk,但是记录有多少chunk被标为已经合并;
10.2.6 在merge.log中记录“{si} end”,删除si',对si解锁;
11. 在merge.log中记录“merge end”,删除U中的所有文件和merge.log
存在对齐时间序列的消除乱序文件合并流程
...
- 对于每一个IMeasurementSchema建立IMeasurementSchema→chunkMetadataList的列表
- 读取每一个chunkMetadataList,建立List<List<Chunk>> chunks的结构
- 如果是ChunkMetadata,将当前chunk读出包裹一个List放入chunks
- 如果是VectorChunkMetadata,将当前chunk按timeChunk, valueChunk1,...valueChunkN 的顺序包裹一个List放入chunks
...
- 如果IMeasurementSchema是MeasurementSchema,此时chunk列表的第一个chunk也是完整的chunk,直接写第一个chunk的数据写入ChunkWriterImpl
- 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有本行数据写入 VectorChunkWriterImpl
...