Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. 如果是非空的写入线程
    • 在 StorageEngine 中检查SystemInfo是否为reject状态;如果是,则该写入线程循环sleep 100ms(等待flush线程释放内存,system置回正常状态)再进行写入;如果等待6000ms后仍为reject状态,抛出写入异常;
    • 进入对应的StorageGroupProcessor,获取 writeLock
    • 进入对应分区的 TsFileProcessor:(1)获取已有的可写入的顺序或乱序 TsFileProcessor(2)如果没有可写入的TsFileProcessor,创建新的 TsFileProcessor
      • 统计当前写入计划新增的内存占用,增加至TspInfo和SgInfo中:(1)新测点增加 chunk_metadata(2)TEXT 类型数据(3)TVList 中增加的 PrimitiveArray(4)flush内存
      • 如果 SGInfo 增量超过阈值(storage_group_size_report_threshold=16M)
        • 向SystemInfo进行上报(将当前 TsFileProcessor 传入);
          synchronized(SystemInfo) {
          • 更新 SystemInfo 内存占用。
          • 如果 SystemInfo 内存占用 < 总写入内存 * flush_proportion,返回 true。
          • 如果 总写入内存 * flush_proportion ≤ SystemInfo 内存占用 < 总写入内存 * reject_proportion, 执行 写入线程的提交flush流程 选择Memtable提交flush流程,返回 true。
          • 如果 总写入内存 * reject_proportion ≤ SystemInfo 内存占用, SystemInfo 置为 reject 状态, 执行 写入线程的提交flush流程 选择Memtable提交flush流程,记返回值为 flag
            • 如果 flag = true 
              • 如果 SystemInfo 内存占用 < 总写入内存,则返回 true
              • 如果 SystemInfo 内存占用 ≥ 总写入内存,直接抛 写入Reject 异常
            • 如果 flag = false,则返回 false
              }
        • 判断 向SystemInfo上报 的返回结果
          • 如果返回 false,则该写入线程循环 writeLock.condition.await(50ms) ,检查 SystemInfo 的 reject 状态如果不 reject,执行正常写入。如果等待 max_waiting_time_when_insert_blocked 后仍为reject状态,抛出写入异常
          • 如果返回 true,则执行正常写入
          • 如果捕获到 写入Reject 异常,reset SystemInfo,并继续向上抛
      在 StorageGroupProcessor 里检查 shouldFlush,如果为true,进行异步flush过程
      • 检查 workingMemtable 的 shouldFlush,如果为true,提交 Flush 任务,并根据文件大小判断是否需要 close。
    • StorageGroupProsessor. 释放writeLock
  2. 如果是空的写入线程
    • 进入对应的 StorageGroupProcessor,获取 writeLock
    • 获取对应分区的 TsFileProcessor:如果(其 workingMemtable 不为空且 shouldFlush 为 true),则提交 flush 任务;否则直接返回。
    • StorageGroupProsessor. 释放writeLock

Flush流程:

flush 分为两种 一种为正常flush,为写入过程中触发,在insertPlan写入完成后正式开始进行异步flush(正常写入流程中flush);另一种为异步即时flush,即触发后立刻开始flush(flush释放内存后,若System总内存仍处于flush阈值之上且当前没有flush任务时触发)


写入线程的提交flush流程选择Memtable提交flush流程

  1. 使用 PriorityQueue 对当前系统所有 memtable 按占用内存由大到小排序
  2. boolean flag = false
  3. 当前活跃内存 = SystemInfo 总内存 - SystemInfo flush内存
  4. 对 PriorityQueue 的每个 workingMemtable 逐个标记 shouldFlush,(直到标记的这些 TsFileProcessor 刷盘后 当前活跃内存 能降到 flush 阈值之下)
    1. 提交一个异步的空的写入线程(写入被标记的 Memtable 中)
    2. 判断此 workingMemtable 是否属于当前 TsFileProcessor,如果属于,flag = true
  5. 返回 flag


Flush 流程:线程:

  1. 先更新 SystemInfo flush 内存
  2. 将 workingMemtable 移到 flushingMemtables 中

关闭TsFile文件逻辑:

文件封口触发逻辑与现有master版本相同,都为一个insertPlan写入完成后,检查该TSP是否需要flush,如果需要,再检查是否TsFile大小超过阈值,如果超过,flush memtable后将文件封口。

...


TsFile文件关闭逻辑:

  1. 一个insertPlan写入完成后,检查该TSP的 workingMemtable 的 shouldFlush 字段,如果为 true,再检查是否TsFile大小超过阈值,如果超过,flush memtable后将文件封口。
  2. TsFile关闭完成后,清空该TSPInfo,重置对应的 SGInfo 状态,并向SystemInfo报告重置后SGInfo
    1. 如果此时SystemInfo 为reject状态 且 `SystemInfo中统计的总内存 < 总写入内存 *
  3. 80%`,将SystemInfo
    1. reject_proportion`,将SystemInfo 置于正常状态
  4. 如果`SystemInfo中统计的总内存
    1. 如果`SystemInfo 总内存 >= 总写入内存 * 50%`,触发即时flush

 

MTree内存控制:

注册时间序列时,如果总时间序列个数*300 > 总内存*0.1,此时拒绝注册,抛出异常。

...