Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. 检查SystemInfo是否为reject状态;如果是,则该写入线程循环sleep 100ms 等待flush线程释放内存,system置回正常状态再进行写入;如果等待6000ms后仍为reject状态,抛出写入异常;
  2. 进入对应的StorageGroupProcessor,获取 进入对应的StorageGroupProcessor,获取 writeLock
  3. 进入对应分区的 TsFileProcessor:(1)获取已有的可写入的顺序或乱序 TsFileProcessor(2)如果没有可写入的TsFileProcessor,创建新的 TsFileProcessor
    1. 统计当前写入计划新增的内存占用,增加至TspInfo和SgInfo中:(1)新测点增加 chunk_metadata(2)新设备增加 unclosed resource(3)TEXT 类型数据(4)TVList 中增加的 PrimitiveArray(5)flush内存
    2. 如果SGInfo变化超过System 上报的阈值(storage_group_size_report_threshold=16M)
      1. 向SystemInfo进行上报;
        synchronized(SystemInfo) {
        1. 更新 SystemInfo 内存占用。
        2. 如果 SystemInfo 内存占用 < 总写入内存 * flush_proportion,返回 true。
        3. 如果 总写入内存 * flush_proportion <= SystemInfo 内存占用 < 总写入内存 * reject_proportion, 触发flush,返回 执行 写入线程的提交flush流程,返回 true。
        4. 如果 总写入内存 * 80% <= SystemInfo 内存占用, SystemInfo 置为 reject 状态,触发flush,返回 false。
          }
      2. 如果返回 false,则该写入线程循环sleep 50ms 检查 false,则该写入线程循环 writeLock.condition.await(50ms) ,检查 SystemInfo 的 reject 状态如果不 reject,执行正常写入。如果等待 max_waiting_time_when_insert_blocked 后仍为reject状态,抛出写入异常
  4. 在 StorageGroupProcessor 里检查 shouldFlush,如果为true,进行异步flush过程
  5. StorageGroupProsessor. 释放writeLock


Flush流程:

flush 分为两种 一种为正常flush,为写入过程中触发,在insertPlan写入完成后正式开始进行异步flush(正常写入流程中flush);另一种为异步即时flush,即触发后立刻开始flush(flush释放内存后,若System总内存仍处于flush阈值之上且当前没有flush任务时触发)

正常flush流程:

  1. 通过上报到system里的sgInfo,找到所有的TSP;
  2. 使用PriorityQueue pop出当前workMemTable内存占用最大的Top K个TSP,将其标记为shouldFlush;如果flush此memtable后系统仍在flush阈值以上,再从PriorityQueue pop出一个TSP做标记,直到这些memtable flush后会回到flush阈值一下或者PriorityQueue为空
  3. 写入完成后在StorageGroupProcessor里检查shouldFlush,如果为true,进行异步flush过程

...

写入线程的提交flush流程:

  1. 获取整个系统当前正在 Flush 的线程个数,如果大于 0,就返回
  2. 使用 PriorityQueue 对当前 TsFileProcessor 占用内存由大到小排序,逐个标记 shouldFlush,直到标记的这些 TsFileProcessor 刷盘后内存能降到 flush 阈值的50%

Flush 线程的提交flush流程:

  1. 获取整个系统当前正在 Flush 的线程个数,如果大于 1,就返回
  2. 通过上报到system里的sgInfo,找到所有的TSP;
  3. 使用PriorityQueue pop出当前workMemTable内存占用最大的Top K个TSP,进行异步flush;如果flush此memtable后系统仍在flush阈值以上,再从PriorityQueue pop出一个TSP进行异步flush,直到这些memtable flush后会回到flush阈值一下或者PriorityQueue为空

...