大致流程

  • 空间内合并的文件选择
    • 空间内合并是从低层向高层搜索某层若满足连续预设数量个文件或文件总大小超过预设则将该批文件封装到一个合并任务里,因此一次搜索某层可能出现好几批待合并文件,分别封装成好几个合并任务


  • 跨空间合并的文件选择
    • 跨空间合并则是针对该虚拟存储组下该时间分区下的所有乱序文件与其有Overlap的顺序文件都封装在一个合并任务里。

    • 具体判断是否有Overlap的方法是:

      • 遍历每个乱序文件里的每个ChunkGroup
        • 遍历每个候选顺序文件,获取unseqStartTime, unseqEndTime, seqStartTime, seqEndTime
          • 若unseqEndTime<seqStartTime && 该乱序文件没有一个overlapped顺序文件,则选中该顺序文件,后面没有overlap顺序文件.
          • 若该顺序文件isNotClosed,则选中该顺序文件,因为不知道它是否有重叠
          • 若unseqEndTime<=seqEndTime,则选中该顺序文件,后续没有overlap顺序文件
          • 若unseqStartTime<=seqEndTime,则选中该顺序文件,后续可能还有overlap顺序文件


  • 无论是空间内还是跨空间合并在选择文件的时候都没有读取对应的.mods文件判断数据是否被删除。


合并任务的文件选择器AbstractCompactionSelector

根据一定的策略选择待被合并TsFile文件,并为每批TsFile文件创建一个合并任务放入合并任务管理器CompactionTaskManager的任务等待队列里。


空间内合并的文件选择器AbstractInnerSpaceCompactionSelector

SizeTieredCompactionSelector选择器

  1. 主要流程
    • 新建临时候选任务队列 tempTaskQueue
    • 遍历所有 TsFileResource,获取当前 TsFileResource 的最高层级 maxSearchLevel
    • currentLevel 从 0 到 maxSearchLevel
      • 新建标志量 submitInCurrentLevel = false 
      • 临时文件列表 tempResourceList 
      • 遍历每一个 TsFileResource
        • 如果当前 tsFileResource 的层级为 currentLevel,并且 !isDeleted() && !isCompacting() && closed() && !isCompactionCandidate()
          • 将其加入到 tempResourceList 中
          • 如果 tempResourceList 的大小到达了目标数目或者其中所有文件体积之和超过了目标大小
            • 将其加入到 tempTaskQueue 中
            • 清空tempResourceList 
            • submitInCurrentLevel = true
      • 如果 submitInCurrentLevel == true
        • break
    • 从 tempTaskQueue 中逐一取出其中的任务
      • 将任务中的所有 tsFileResource 的 isCompactionCandidate 设置为 true
      • 将任务提交到全局的暂存队列中  


  2.  重要属性
    • tsFileResources:文件链表,存放着该虚拟存储组下的该时间分区下的所有顺序或者乱序文件。
    • sequence:顺序or乱序
    • taskFactory:任务工厂,负责为每批待合并的TsFile创建一个合并任务
  3. 重要方法
    • selectAndSubmit():返回是否找到文件并提交合并任务

//该方法从0层向最高层,逐层搜索是否有待被合并的一批批文件,若某层上有一批以上的待合并文件,则不再向高层搜索,并且为每批文件创建合并线程,并放入合并任务管理器CompactionTaskManager的任务等待队列里

      • 创建任务优先级队列taskPriorityQueue//负责存放每批待合并文件的TsFileResource列表和文件的总大小
      • 获取所有的顺序或者乱序文件里空间内合并的最大层数maxLevel=searchFileMaxLevel();
      • 从第0层开始向最高层,针对每一层执行如下文件搜索
      • if!selectLevelTask(currentLevel,taskPriorityQueue)//若当前层搜索到一批以上待合并的文件,则把他们的TsFileResource列表和总文件大小放入taskPriorityQueue队列里,且不继续向高层搜索文件
        • break
      •  遍历taskPriorityQueue里每个元素 
        • 使用taskFactory对任务优先级队列里的该批文件创建合并线程,并放入合并任务管理器CompactionTaskManager的任务等待队列里

 

    •  selectLevelTask(currentLevel,taskPriorityQueue) :返回是否要向高层level继续搜索  

//该方法搜索第level层上的所有文件,若该层上有连续的文件满足系统预设的条件(数量超过10个或者总文件大小超过2G)则为该批文件创建一个合并任务放进taskPriorityQueue队列里,并继续搜索下一批。若在该层上搜索到至少一批以上待合并文件,则返回false(示意不再向高层搜索),否则返回true。

      • 创建临时选中的文件列表selectedFileList和选中的文件总大小selectedFileSize
      • 创建Boolean变量shouldContinueToSearch=true//当在该层找到了一批以上的待合并的文件后则会把该变量设为false,示意文件选择器不再往高层
      • 依次遍历该虚拟存储组的该时间分区下的所有顺序文件,
        • 若该顺序文件的空间内合并层级不等于currentLevel
          • 清空selectedFileListselectedFileSize,遍历下一个顺序文件
        • 否则该顺序文件的空间内合并层级等于currentLeve,则把他加入到selectedFileList,并且selectedFileSize加上该文件大小
        •  (被选中的文件数量>=系统预设值为10) || (被选中的文件的总大小>=系统预设值为2GB
          • 把该批选中的文件列表及其总文件大小放入taskPriorityQueue队列里。并清空selectedFileListselectedFileSize,将shouldContinueToSearch设为false
        • 创建临时选中的文件列表selectedFileList和选中的文件总大小selectedFileSize

 

跨空间合并的文件选择器AbstractCrossSpaceCompactionSelector

RewriteCrossSpaceCompactionSelector选择器

  1. 主要流程
    • 从虚拟存储组下该时间分区里的所有顺序和乱序文件中,选取“候选文件”:
      • 候选顺序文件:过滤掉isNotExistisDeleteisOverTTL的文件。
      • 候选乱序文件:从第一个文件开始
        • 若遇到isNotExist || isDelete || isOverTTL 的则跳过
        • 若遇到 isCompacting || isCompactionCandidate || isNotClosed 则返回
        • 否则添加到候选乱序文件中
    • 在指定的时间内(30s)选取“源文件”,即从候选文件中选取与乱序文件有重叠的顺序文件:
      • 遍历乱序文件,寻找与该乱序文件有重叠的所有顺序文件:(使用loose评估内存)
        • 遍历该乱序文件里的每个ChunkGroup,
          • 遍历每个候选顺序文件,获取unseqStartTime, unseqEndTime, seqStartTime, seqEndTime
            • 若unseqEndTime<seqStartTime && 该乱序文件没有一个overlapped顺序文件,则选中该顺序文件,后面没有overlap顺序文件.
            • 若该顺序文件isNotClosed,则选中该顺序文件,因为不知道它是否有重叠
            • 若unseqEndTime<=seqEndTime,则选中该顺序文件,后续没有overlap顺序文件
            • 若unseqStartTime<=seqEndTime,则选中该顺序文件,后续可能还有overlap顺序文件 
        • 若顺序文件中不存在isCompactingisCompactionCandidate的,则检查并用评估它们在合并中的内存开销是否超过阈值,若否,则将该批乱序和对应的顺序文件放入源文件中
        • 若顺序文件中存在一个及以上isCompactingisCompactionCandidate的,则丢弃该乱序和对应的顺序文件,并停止选取源文件(因为要求源乱序文件必须是连续的)。
      • 若选取的乱序文件数量为0,则使用tight评估重新选取源文件
    • 若源顺序文件或者源乱序文件列表中有一个为0,则放弃此合并任务。否则,对所有的源文件set isCompactionCandidatetrue,并把该任务丢进等待队列里。 


       
  1.  重要属性
    • sequenceFileResources:顺序文件链表,存放着该虚拟存储组下的该时间分区下的所有顺序文件。
    • unsequenceFileResources:乱序文件链表,存放着该虚拟存储组下的该时间分区下的所有乱序文件。
    • taskFactory:任务工厂,负责为每批待合并的TsFile创建一个合并任务
  2. 重要方法
    • RewriteCrossSpaceCompactionSelector的重要方法
      • selectAndSubmit():返回是否找到文件并提交合并任务

//该方法根据跨空间合并的文件选择策略创建具体的文件选择器,使用文件选择器选取该虚拟存储组下该时间分区下的所有待被合并的乱序文件和顺序文件,并为他们创建一个合并任务放入合并任务管理器CompactionTaskManager的任务等待队列里

        • 若 (当前正在合并的线程数量>=系统预设的数量为10)||(系统预设为不允许跨空间合并:!enableCrossSpaceCompaction||(当前虚拟存储组的当前时间分区存在合并任务) //有个基类,跨空间和空间内合并的文件选择器的基类
          • return false
        • 若该存储组的该时间分区下不存在顺序文件或乱序文件
          • return false   //注意:当只有乱序文件而没有顺序文件时,则应该移动或者重写此乱序文件(可能顺序文件的数据被删掉了,而乱序文件还存在)
        • 创建跨空间合并的资源管理器mergeResource,这里会选择“候选文件”。
        • rewriteCompactionFileSelector.select(),使用具体的文件选择器选取所有的待被合并的乱序文件和顺序文件,装入mergeFiles数组里,第一个元素是待合并的顺序文件列表,第二个元素是待合并的乱序文件列表
        • mergeFiles长度为0
          • return false
        • mergeResource.clear()
        • mergeResource.setCacheDeviceMeta()
        • 使用任务工厂taskFactory和选中待合并的乱序和顺序文件创建合并任务,并添加到合并任务管理器CompactionTaskManager的任务等待队列里
        • return true
    • RewriteCompactionFileSelector的重要方法
      • select()方法:返回的数组里第一个元素是待合并的顺序文件列表,第二个元素是待合并的乱序文件列表
        • 具体流程
          • 在系统预设的时间内(30秒),针对每个乱序文件查找与其Overlap且还未被此次合并任务文件选择器选中的顺序文件列表,每找到一个乱序文件及其对应的Overlap顺序文件列表后就预估他们进行合并可能增加的额外内存开销(优先使用loose估计),若未超过系统给合并线程预设的内存开销,则把他们放入到此合并任务选中的顺序和乱序文件里。更新该虚拟存储组下该时间分区的跨空间合并资源管理器里的顺序文件和乱序文件列表,移除其未被选中的文件的SequenceReader以清缓存。
        • 计算合并某一乱序文件与其对应的Overlap的顺序文件会新增的内存开销的方法:

//loose估计出的大小会偏大、误差大;而tight估计出的大小精确到具体合并的有多少序列,误差小

          • Loose估计:乱序文件占用的大小(即整个乱序文件大小,因为可能会读取出乱序文件里的所有Chunk+顺序文件占用的大小(由于每次只读取一个顺序文件,因此cost记录顺序文件里最大的metadata大小;写入的时候每个文件都会有metadata,因此cost还要把所有顺序文件的metadata加起来)
          • Tight估计:计算获取查询该乱序文件里concurrentMergeNum个时间序列共要花费的可能最大内存空间,即(总文件大小*MaxChunkNum/totalChunkNum);计算获取查询该顺序文件里concurrentMergeNum个时间序列共要花费的可能最大内存空间,即(总索引区大小*MaxChunkNum序列最多含有的Chunk数量/totalChunkNum所有序列Chunk数量和)

        优先使用loose估计内存的原因是速度快,可是估计结果偏大,误差大,可能导致估计的内存太大超过系统给合并任务预值的内存值,因此该合并任务就没有一个被选中待合并的乱序和对应Overlap的顺序文件。若发生这种情况,再使用tight估计内存来选取待合并文件,它所估计的内存精确到具体要访问几个序列,较精确,可是要访问文件磁盘上每个序列的具体Chunk数量,速度较慢。


      • select(isTight)方法   
        • 获取系统给合并任务的文件选择预设的时间阈值为30秒·       
        • 遍历每个乱序文件,且当选择文件的耗时还未超过系统预设值时
          • 当被选中待合并的顺序文件的数量 != 所有顺序文件的数量
            • selectOverLappedSeqFiles(unseqFile)选中与该乱序文件Overlap的顺序文件
          • 若该乱序文件或者与其Overlap的某一顺序文件未关闭或正在merging§  清空临时变量,遍历下个乱序文件
          • 计算合并此乱序文件与其对应Overlap的顺序文件列表可能会新增的内存开销
          • 若 合并此乱序文件与其对应Overlap的顺序文件列表可能会新增的内存开销加上原有的其他乱序文件进行合并的开销不会超过系统给合并线程预设的内存开销
            • 把选中的乱序文件和对应Overlap的顺序文件放入全局对象列表里
      • selectOverLappedSeqFiles(unseqFile)
        • 具体流程:将与该unseqFile乱序文件有Overlap的并且还未被此次合并任务文件选择器选中的顺序文件的索引放入tmpSelectedSeqFiles列表里。
        • 具体判断是否有Overlap方法:
          • 遍历每个乱序文件里的每个ChunkGroup
            • 遍历每个候选顺序文件,获取unseqStartTime, unseqEndTime, seqStartTime, seqEndTime
              • 若unseqEndTime<seqStartTime && 该乱序文件没有一个overlapped顺序文件,则选中该顺序文件,后面没有overlap顺序文件.
              • 若该顺序文件isNotClosed,则选中该顺序文件,因为不知道它是否有重叠
              • 若unseqEndTime<=seqEndTime,则选中该顺序文件,后续没有overlap顺序文件
              • 若unseqStartTime<=seqEndTime,则选中该顺序文件,后续可能还有overlap顺序文件
  • No labels