You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 20 Current »

目标

  • 无需在有无乱序数据时分开配置
  • 避免设置活跃的 partition 个数
  • 考虑 PrimitiveArrayPool 内存占用
  • 尽量有效利用内存,使 Chunk 尽量大
  • 尽量保证创建元数据成功,为保证内存不爆,可以拒绝写入
  • 尽量在有无乱序情况下,iotdb参数不需要变化都能很好适应
  • 尽量不阻塞写入
  • 尽量不加入固定参数


写入流程各部分内存统计


RPC模块

  • 一次请求的大小受限制 thrift_max_frame_size=67108864.(防止许用户一条SQL写入1亿个点等场景;或者写了一个大于2GB的bytes[]).
  • 并发数受限制 rpc_max_concurrent_client_num=65535。


核心思想:

  • Schema和历史resource单独分配大小;下文仅考虑其余写数据部分大小。
  • 每个SG统计自身的chunk_metadata和unseal_resource大小;
  • 全局ArrayPool统计buffered和out of buffer的array大小
  • 系统统计总的大小

数据写入流程


写入线程

  1. 如果是非空的写入线程
    • 在 StorageEngine 中检查SystemInfo是否为reject状态;如果是,则该写入线程循环sleep 50ms(等待flush线程释放内存,system置回正常状态)再进行写入;如果等待max_waiting_time_when_insert_blocked后仍为reject状态,抛出写入异常;
    • 进入对应的StorageGroupProcessor,获取 writeLock
    • 进入对应分区的 TsFileProcessor:(1)获取已有的可写入的顺序或乱序 TsFileProcessor(2)如果没有可写入的TsFileProcessor,创建新的 TsFileProcessor
      • 统计当前写入计划新增的内存占用,增加至TspInfo和SgInfo中:(1)新测点增加 chunk_metadata(2)TEXT 类型数据(3)TVList 中增加的 PrimitiveArray(4)flush内存
      • 如果 SGInfo 增量超过阈值(storage_group_size_report_threshold=16M)
        • 向SystemInfo进行上报(将当前 TsFileProcessor 传入);
          synchronized(SystemInfo) {
          • 更新 SystemInfo 内存占用。
          • 如果 SystemInfo 内存占用 < 总写入内存 * flush_proportion,返回 true。
          • 如果 总写入内存 * flush_proportion ≤ SystemInfo 内存占用 < 总写入内存 * reject_proportion, 执行 选择Memtable提交flush流程,返回 true。
          • 如果 总写入内存 * reject_proportion ≤ SystemInfo 内存占用, SystemInfo 置为 reject 状态, 执行 选择Memtable提交flush流程记返回值为 flag
            • 如果 flag = true 
              • 如果 SystemInfo 内存占用 < 总写入内存,则返回 true
              • 如果 SystemInfo 内存占用 ≥ 总写入内存,直接抛 写入Reject 异常
            • 如果 flag = false,则返回 false
              }
        • 判断 向SystemInfo上报 的返回结果
          • 如果返回 false,则该写入线程循环 sleep (50ms) ,检查 SystemInfo 的 reject 状态如果不 reject或者该memtable被标记为shouldFlush,执行正常写入。如果等待 max_waiting_time_when_insert_blocked 后仍为reject状态,抛出写入异常
          • 如果返回 true,则执行正常写入
          • 如果捕获到 写入Reject 异常,reset SystemInfo,并继续向上抛
      • 检查 workingMemtable 的 shouldFlush,如果为true,提交 Flush 任务,并根据文件大小判断是否需要 close。
    • StorageGroupProsessor. 释放writeLock
  2. 如果是空的写入线程
    • 进入对应的 StorageGroupProcessor,获取 writeLock
    • 获取对应分区的 TsFileProcessor:如果(其 workingMemtable 不为空且 shouldFlush 为 true),则提交 flush 任务;否则直接返回。
    • StorageGroupProsessor. 释放writeLock


选择Memtable提交flush流程

  1. 使用 PriorityQueue 对当前系统所有 memtable 按占用内存由大到小排序
  2. boolean flag = false
  3. 当前活跃内存 = SystemInfo 总内存 - SystemInfo flush内存
  4. 对 PriorityQueue 的每个 workingMemtable 逐个标记 shouldFlush,(直到标记的这些 TsFileProcessor 刷盘后 当前活跃内存 能降到 flush 阈值之下)
    1. 提交一个异步的空的写入线程(写入被标记的 Memtable 中)
    2. 判断此 workingMemtable 是否属于当前 TsFileProcessor,如果属于,flag = true
  5. 返回 flag


Flush 线程:

  1. 先更新 SystemInfo flush 内存
  2. 将 workingMemtable 移到 flushingMemtables 中


TsFile文件关闭逻辑:

  1. 一个insertPlan写入完成后,检查该TSP的 workingMemtable 的 shouldFlush 字段,如果为 true,再检查是否TsFile大小超过阈值,如果超过,flush memtable后将文件封口。
  2. TsFile关闭完成后,清空该TSPInfo,重置对应的 SGInfo 状态,并向SystemInfo报告重置后SGInfo
  3. 如果此时SystemInfo 为reject状态 且 `SystemInfo中统计的总内存 < 总写入内存 * reject_proportion`,将SystemInfo 置于正常状态

 

MTree内存控制:

注册时间序列时,如果总时间序列个数*estimate_series_size > 总内存*write_read_schema_free_memory_proportion:schema,此时拒绝注册,抛出异常。



相关参数整理


  1. 是否开启内存控制
    enable_mem_control=true
    以下参数只在开启内存控制时生效: 
    1. flush阈值(0.0--1.0)(关闭内存控制后无效)
      当所有memtable实际占用大于总写入内存 * flush_proportion,触发flush
      flush_proportion=0.4
    2. reject阈值(0.0--1.0)(关闭内存控制后无效)
      当所有memtable实际占用大于总写入内存 * reject_proportion,阻塞写入,等待flush释放内存
      reject_proportion=0.8
    3. array pool内存占总写入内存的大小比例(0.0--1.0)
      buffered_arrays_memory_proportion=0.6
    4. sg上报阈值(bytes)(关闭内存控制后无效)
      当一个sg内所有memtable的内存相比上次上报的增量大于这个值,向SystemInfo更新目前Sg的总大小
      注意:当sg较多,例如1000时,需要考虑调小这个值。因为此时memtable需要写 16M * 1000 = 16G 才会向SystemInfo 汇报,比较危险。
      storage_group_report_threshold=16777216
    5. 阻塞写入后的检查周期(ms)(关闭内存控制后无效)
      当写入被SystemInfo拒绝后,客户端线程会以这个时间周期去检查SystemInfo的状态,直到flush线程释放掉一些内存,SystemInfo置回正常状态。
      check_period_when_insert_blocked=50
    6. 阻塞写入后的最大等待时间(ms)(关闭内存控制后无效)
      当写入阻塞时间超过这个值后,向客户端返回写入异常
      max_waiting_time_when_insert_blocked=10000
    7. 预估一条序列在mtree中的大小(关闭内存控制后无效)
      这个值用来限制可注册的序列个数
      estimated_series_size=300
  2. memtable大小阈值(bytes)(开启内存控制后无效)
    memtable_size_threshold=1073741824
  3. 写入、查询、schema、剩余内存占比
    其中可注册的序列的个数由schema的内存(byte)除以estimated_series_size来确定
    write_read_schema_free_memory_proportion=4:3:1:2
  4. array pool中的array长度
    平均每个chunk的点数小于这个值时会造成内存的浪费,可以考虑调小这个值
    primitive_array_size=128
  5. TsFile大小阈值
    0代表只刷一个memtable就关文件;1代表刷两个memtable才关文件;大于1时如1G时,1个文件中会存在更多的memtable
    目前设为1是考虑merge会把小文件合并为大文件;而且如果这个阈值过大且开启内存控制后,会导致内存中metadata积累较多,memtable越来越小
    tsfile_size_threshold=1
  6. 平均chunk点数阈值(个数)
    当memtable内平均每个序列的点数超过这个阈值时,触发flush
    avg_series_point_number_threshold=10000




优点:

  1. 所有SG共享内存,不再对每个SG单独设置一个内存上限,因此创建序列(或今后改为序列活跃情况变化)时也不需要再更新SG;好处是内存利用率可以很高;

缺点:

  1. 部分步骤需要全局锁;目前看,假设array为k,SG info 写x延迟上报,则个memTable写入16MB后,会拿一次全局锁更新全局内存情况。


细节:关于Array Pool中分类型的数组如何管理?

解法: 置换策略

a. 按序列注册比例置换

各类型数据在pool中的比例由schema中序列的数据类型比例决定。当pool中buffer已满,但是各类型的array的比例还未调整至schema中的比例时,申请(那些比例应该很高、但是buffer中还很少的)数据类型的size时,先当做OOP,待归还时逐渐将比例调好。另,为了加速比例调整,可以在此处触发flush;

按使用频度置换:

LRU缺点:有类型写入很快、有类型很慢,则抖动太厉害(慢速的加入池中,会踢出快类型的,然后还没被复用,就又被踢走了);LFU:负载变化时适应太慢;2Q。


细节:为什么当SG的info变化超过一定阈值才向SystemInfo汇报?

解法:为了减少全局锁。假设Sys预留1GB出来,有n个SG,那么每个SG info每增长1/n GB数据时,才需要跟全局同步一次,减少contention。


细节:String/byte[] 不需要buffer池的原因。

解:当接收到客户端一个string/byte[]时,接收线程已经占用了这么多内存了,此时将该byte[]直接放到memtable里最合适(指针移动)。


细节:Array Pool中的字符型/byte[]如何管理?因为每个String的长度可能不同。

解法:Array Pool中有List<Binary[]> 用于做array的缓冲池,但是归还时,内部每个Binary均为null。此时有两种方法可选:

  1. 将String类型的数组申请每次都当做array Pool无法响应。缺点是每次都要触发2.2.2,带来全局锁。
  2. 在ArrayPool中虚拟地向Sys info汇报自身拥有不同大小的byte[] 若干个(即byte[][]),当plan中有string时,向arrayPool中申请一个可以容纳的byte[], array Pool中有,则arrayPool做计数,认为该byte[]被借走。flush时在归还。优点是不会增加全局锁,缺点是byte[]长度变化严重时,这里内存利用率会降低。【该方法仍然需要每次都要全局锁。】
  3. 将该部分内存放入SG info中统计。


细节:TsFile什么时候关闭?

解法:指定TsFile的大小;或者由于SGInfo中Chunk_metadata过大导致刷磁盘时进行关闭。


在该方案中,只有图中红色部分是全局锁(ArrayPool, SystemInfo,reject信号量)


详细计算公式:


    • WAL buffer: 一个存储组固定大小的buffer:b(永久)
    • PrimitiveArrayPool 原始类型数组缓存(永久)
      • memtable 写入数据使用
      • memtable 排序使用
      • String数组每次会清空(现状)。
    • TVListAllocator 中 TVList 对象头缓存(永久)
    • MemtablePool 中 Memtable 对象头缓存(永久)
    • flush 之后在内存中积累的 ChunkMetadata 缓存大小为 K(临时)
    • flush编码后的字节数组:一个Chunk编码后的大小(临时)


每个存储组维护 SGinfo: G

    • G=C + U + B + b (ChunkMetadata + UnsealedResources + Bytes[] + WAL)
    • 当前写入plan在该SG下的数据总内存占用为 T

当前注册的时间序列数量 N


ArrayPool维护 ArrayPoolInfo:A

    • Buffered Array 内存 B,已用B_u
    • OOP内存 O
    • A=B_u+O

系统级别维护SystemInfo:S

    • S=A+sum(G
  1. 若写入正常,开始向working memtable中写入数据,当memtable中Array空间不足时,向Array Pool申请新的Array。array pool判断是否有已向系统报备过的该类型array(即Buffered array)
    1. 如果Buffered arrayArray Pool申请Buffered array并写入数据;
    2. 如果没有,则需要申请OOB(out of buffered)的数组

SG个数:M

系统为delay上报预留的内存大小:R

SG上报阈值: R/M


条件:

    • available array:所有measurement都在memtable中有,且空间足够;
    • available buffered array:B > B_u && 相应数据类型的array存在;
    • 左下的update system info:G的增量大于R/M。
    • 生成reject:S>=可写入内存(或达到一定比例,如90%)
    • 左边的call for flush: S >=可写入内存*比例 (如50%)


当一个SG被拆分成多个时间分区时,将上文中的SG改为TSP。


当String类型的flush后,更新tsp中对应的内存统计量并上报给System

TsFile封口时,更新ChunkMetadata和Resource的内存统计并上报给System


文档原链接: https://shimo.im/docs/CWxXTDhvkRrHvXPx

  • No labels