Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

compaction_target_tsfile_size=2000000000 (合并的目标文件大小)

compaction_cross_space_max_select_unseq_file_num=100(跨文件空间合并一次最多提交的乱序文件个数)

TsFileManagement 文件管理器(每个虚拟存储组一个)

...

  • hasSelected = true
  • while (hasSelected && currentTaskNum < compaction_concurrent_thread)
    • hasSelected = selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
    • hasSelected |= selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
    • hasSelected |= crossSpaceCompactionselectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表);


doCompactionInInnerSpacePriority

  • selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
  • selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)
  • crossSpaceCompactionselectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表);


doCompactionInCrossSpacePriority

  • crossSpaceCompactionselectAndSubmitCrossSpaceFiles(顺序文件链表, 乱序文件链表)
  • selectAndSubmitInSpaceFiles(顺序文件链表,isSeq=true)
  • selectAndSubmitInSpaceFiles(乱序文件链表,isSeq=false)

...

  • hasSelect = false
  • 候选文件列表 Flist=[]
  • 倒序遍历文件列表
    • 如果  (任务已经足够多:currentTaskNum >= compaction_concurrent_thread)
            || (无需执行顺序合并: isSeq && !enable_seq_space_compaction)
            || (无需执行乱序合并: !isSeq && !enable_unseq_space_compaction)
      • return hasSelected
    • 如果(当前文件大小 > target_tsfile_size || 当前文件正在合并 || 当前文件未关闭)
      • 清空 FList;
      • continue;
    • 将当前文件加入 FList
    • 如果 FList 内所有文件总大小大于 compaction_target_tsfile_size || FList 内文件数达到 compaction_candidate_file_num
      • 将 FList 拷贝提交异步任务(空间内合并执行流程), FList=[],++currentTaskNum, hasSelected=true
      • 如果 compaction_priority=balance
        • break;
  • return hasSelected


crossSpaceCompactionselectAndSubmitCrossSpaceFiles(顺序文件链表 S, 乱序文件链表 U): 返回是否选中文件做合并

  • hasSelected = false
  • 如果 currentTaskNum >= compaction_concurrent_thread
           || !enable_cross_space_compaction
    • return hasSelected;
  • 候选乱序文件 UF = []
  • 候选顺序文件 SF 候选文件 F = []
  • hasSelected = false
  • for 乱序文件 u in  乱序文件链表 U
    • 临时选中顺序文件 TmpSF = [] 
    • for 顺序文件 s
    in  顺序文件链表
    • in 顺序文件链表 S
        if isOverlap
        • if isOverlap(
        U, s) && memoryCost(F, s) < memory_budget
        • 将 s 添加进 F 中
        • hasSelected = true
      将 F 提交异步任务(跨文件空间合并执行流程)
        • u, s)
          • TmpSF.add(s)
      • if not (u 未关闭 || u 正在合并 || s in TmpSF 未关闭 || s in TmpSF 正在合并)
        • UF.add(u); SF.addAll(TmpSF);
        • if UF.size() < compaction_cross_space_max_select_unseq_file_num
          • continue
      • 将 UF 和 SF 提交异步任务(跨空间合并执行流程), UF=[],SF=[], ++currentTaskNum,
      • ++currentTaskNum
      • hasSelected=true
      • 如果 compaction_priority=balance, break;
    • return hasSelected

    ...