目标

  • 无需在有无乱序数据时分开配置
  • 避免设置活跃的 partition 个数
  • 考虑 PrimitiveArrayPool 内存占用
  • 尽量有效利用内存,使 Chunk 尽量大
  • 尽量保证创建元数据成功,为保证内存不爆,可以拒绝写入
  • 尽量在有无乱序情况下,iotdb参数不需要变化都能很好适应
  • 尽量不阻塞写入
  • 尽量不加入固定参数


所涉及的统计信息类

AbstractMemTable

包括以下两个内存统计值:

  • tvListRamCost:所有TVList被分配的内存总大小,包括TEXT值和primitive arrays中未被占用的空值

  • memSize:数据点实际占用的内存大小,包括TEXT值

两者的关系如图所示:

从图中可以看出,memSize ≤ tvListRamCost

TsFileProcessorInfo

维护一个TsFileProcessor的内存占用,任何内存变动都需要向StorageGroupInfo汇报

  • memCost:所有ChunkMetadata占用的内存大小

StorageGroupInfo

维护一个存储组的内存占用,当内存占用的增量超过指定的阈值时向SystemInfo汇报

  • memoryCost:所有TsFileProcessor占用的ChunkMetadata、primitive arrays和TEXT值的内存总和,即∑ TsFileProcessorInfo.memCost + AbstractMemTable.tvListRamCost

SystemInfo

维护所有存储组的内存占用

  • totalStorageGroupMemCost:所有StorageGroupInfo中memroyCost的总和


写入流程各部分内存统计


RPC模块

  • 一次请求的大小受限制 thrift_max_frame_size=67108864.(防止许用户一条SQL写入1亿个点等场景;或者写了一个大于2GB的bytes[]).
  • 并发数受限制 rpc_max_concurrent_client_num=65535。


核心思想:

  • Schema和历史resource单独分配大小;下文仅考虑其余写数据部分大小。
  • 每个SG统计自身的chunk_metadata和unseal_resource大小;
  • 全局ArrayPool统计buffered和out of buffer的array大小
  • 系统统计总的大小

数据写入流程


写入线程

  1. 如果是非空的写入线程
    • 在 StorageEngine 中检查SystemInfo是否为reject状态;如果是,则该写入线程循环sleep 50ms(等待flush线程释放内存,system置回正常状态)再进行写入;如果等待max_waiting_time_when_insert_blocked后仍为reject状态,抛出写入异常;
    • 进入对应的StorageGroupProcessor,获取 writeLock
    • 进入对应分区的 TsFileProcessor:(1)获取已有的可写入的顺序或乱序 TsFileProcessor(2)如果没有可写入的TsFileProcessor,创建新的 TsFileProcessor
      • 统计当前写入计划新增的内存占用,增加至TspInfo和SgInfo中:(1)新测点增加 chunk_metadata(2)TEXT 类型数据(3)TVList 中增加的 PrimitiveArray(4)flush内存
      • 如果 SGInfo 增量超过阈值(storage_group_size_report_threshold=16M)
        • 向SystemInfo进行上报(将当前 TsFileProcessor 传入);
          synchronized(SystemInfo) {
          • 更新 SystemInfo 内存占用。
          • 如果 SystemInfo 内存占用 < 总写入内存 * flush_proportion,返回 true。
          • 如果 总写入内存 * flush_proportion ≤ SystemInfo 内存占用 < 总写入内存 * reject_proportion, 执行 选择Memtable提交flush流程,返回 true。
          • 如果 总写入内存 * reject_proportion ≤ SystemInfo 内存占用, SystemInfo 置为 reject 状态, 执行 选择Memtable提交flush流程记返回值为 flag
            • 如果 flag = true 
              • 如果 SystemInfo 内存占用 < 总写入内存,则返回 true
              • 如果 SystemInfo 内存占用 ≥ 总写入内存,直接抛 写入Reject 异常
            • 如果 flag = false,则返回 false
              }
        • 判断 向SystemInfo上报 的返回结果
          • 如果返回 false,则该写入线程循环 sleep (50ms) ,检查 SystemInfo 的 reject 状态如果不 reject或者该memtable被标记为shouldFlush,执行正常写入。如果等待 max_waiting_time_when_insert_blocked 后仍为reject状态,抛出写入异常
          • 如果返回 true,则执行正常写入
          • 如果捕获到 写入Reject 异常,reset SystemInfo,并继续向上抛
      • 检查 workingMemtable 的 shouldFlush,如果为true,提交 Flush 任务,并根据文件大小判断是否需要 close。
    • StorageGroupProsessor. 释放writeLock
  2. 如果是空的写入线程
    • 进入对应的 StorageGroupProcessor,获取 writeLock
    • 获取对应分区的 TsFileProcessor:如果(其 workingMemtable 不为空且 shouldFlush 为 true),则提交 flush 任务;否则直接返回。
    • StorageGroupProsessor. 释放writeLock


选择Memtable提交flush流程

  1. 使用 PriorityQueue 对当前系统所有 memtable 按占用内存由大到小排序
  2. boolean flag = false
  3. 当前活跃内存 = SystemInfo 总内存 - SystemInfo flush内存
  4. 对 PriorityQueue 的每个 workingMemtable 逐个标记 shouldFlush,(直到标记的这些 TsFileProcessor 刷盘后 当前活跃内存 能降到 flush 阈值之下)
    1. 提交一个异步的空的写入线程(写入被标记的 Memtable 中)
    2. 判断此 workingMemtable 是否属于当前 TsFileProcessor,如果属于,flag = true
  5. 返回 flag


Flush 线程:

  1. 先更新 SystemInfo flush 内存
  2. 将 workingMemtable 移到 flushingMemtables 中


TsFile文件关闭逻辑:

  1. 一个insertPlan写入完成后,检查该TSP的 workingMemtable 的 shouldFlush 字段,如果为 true,再检查是否TsFile大小超过阈值,如果超过,flush memtable后将文件封口。
  2. TsFile关闭完成后,清空该TSPInfo,重置对应的 SGInfo 状态,并向SystemInfo报告重置后SGInfo
  3. 如果此时SystemInfo 为reject状态 且 `SystemInfo中统计的总内存 < 总写入内存 * reject_proportion`,将SystemInfo 置于正常状态

 

MTree内存控制:

注册时间序列时,如果总时间序列个数*estimate_series_size > 总内存*write_read_schema_free_memory_proportion:schema,此时拒绝注册,抛出异常。



相关参数整理


  1. 是否开启内存控制
    enable_mem_control=true
    以下参数只在开启内存控制时生效: 
    1. flush阈值(0.0--1.0)(关闭内存控制后无效)
      当所有memtable实际占用大于总写入内存 * flush_proportion,触发flush
      flush_proportion=0.4
    2. reject阈值(0.0--1.0)(关闭内存控制后无效)
      当所有memtable实际占用大于总写入内存 * reject_proportion,阻塞写入,等待flush释放内存
      reject_proportion=0.8
    3. array pool内存占总写入内存的大小比例(0.0--1.0)
      buffered_arrays_memory_proportion=0.6
    4. sg上报阈值(bytes)(关闭内存控制后无效)
      当一个sg内所有memtable的内存相比上次上报的增量大于这个值,向SystemInfo更新目前Sg的总大小
      注意:当sg较多,例如1000时,需要考虑调小这个值。因为此时memtable需要写 16M * 1000 = 16G 才会向SystemInfo 汇报,比较危险。
      storage_group_report_threshold=16777216
    5. 阻塞写入后的检查周期(ms)(关闭内存控制后无效)
      当写入被SystemInfo拒绝后,客户端线程会以这个时间周期去检查SystemInfo的状态,直到flush线程释放掉一些内存,SystemInfo置回正常状态。
      check_period_when_insert_blocked=50
    6. 阻塞写入后的最大等待时间(ms)(关闭内存控制后无效)
      当写入阻塞时间超过这个值后,向客户端返回写入异常
      max_waiting_time_when_insert_blocked=10000
    7. 预估一条序列在mtree中的大小(关闭内存控制后无效)
      这个值用来限制可注册的序列个数
      estimated_series_size=300
  2. memtable大小阈值(bytes)(开启内存控制后无效)
    memtable_size_threshold=1073741824
  3. 写入、查询、schema、剩余内存占比
    其中可注册的序列的个数由schema的内存(byte)除以estimated_series_size来确定
    write_read_schema_free_memory_proportion=4:3:1:2
  4. array pool中的array长度
    平均每个chunk的点数小于这个值时会造成内存的浪费,可以考虑调小这个值
    primitive_array_size=128
  5. TsFile大小阈值
    0代表只刷一个memtable就关文件;1代表刷两个memtable才关文件;大于1时如1G时,1个文件中会存在更多的memtable
    目前设为1是考虑merge会把小文件合并为大文件;而且如果这个阈值过大且开启内存控制后,会导致内存中metadata积累较多,memtable越来越小
    tsfile_size_threshold=1
  6. 平均chunk点数阈值(个数)
    当memtable内平均每个序列的点数超过这个阈值时,触发flush
    avg_series_point_number_threshold=10000




优点:

  1. 所有SG共享内存,不再对每个SG单独设置一个内存上限,因此创建序列(或今后改为序列活跃情况变化)时也不需要再更新SG;好处是内存利用率可以很高;

缺点:

  1. 部分步骤需要全局锁;目前看,假设array为k,SG info 写x延迟上报,则个memTable写入16MB后,会拿一次全局锁更新全局内存情况。


细节:关于Array Pool中分类型的数组如何管理?

解法: 置换策略

a. 按序列注册比例置换

各类型数据在pool中的比例由schema中序列的数据类型比例决定。当pool中buffer已满,但是各类型的array的比例还未调整至schema中的比例时,申请(那些比例应该很高、但是buffer中还很少的)数据类型的size时,先当做OOP,待归还时逐渐将比例调好。另,为了加速比例调整,可以在此处触发flush;

按使用频度置换:

LRU缺点:有类型写入很快、有类型很慢,则抖动太厉害(慢速的加入池中,会踢出快类型的,然后还没被复用,就又被踢走了);LFU:负载变化时适应太慢;2Q。


细节:为什么当SG的info变化超过一定阈值才向SystemInfo汇报?

解法:为了减少全局锁。假设Sys预留1GB出来,有n个SG,那么每个SG info每增长1/n GB数据时,才需要跟全局同步一次,减少contention。


细节:String/byte[] 不需要buffer池的原因。

解:当接收到客户端一个string/byte[]时,接收线程已经占用了这么多内存了,此时将该byte[]直接放到memtable里最合适(指针移动)。


细节:Array Pool中的字符型/byte[]如何管理?因为每个String的长度可能不同。

解法:Array Pool中有List<Binary[]> 用于做array的缓冲池,但是归还时,内部每个Binary均为null。此时有两种方法可选:

  1. 将String类型的数组申请每次都当做array Pool无法响应。缺点是每次都要触发2.2.2,带来全局锁。
  2. 在ArrayPool中虚拟地向Sys info汇报自身拥有不同大小的byte[] 若干个(即byte[][]),当plan中有string时,向arrayPool中申请一个可以容纳的byte[], array Pool中有,则arrayPool做计数,认为该byte[]被借走。flush时在归还。优点是不会增加全局锁,缺点是byte[]长度变化严重时,这里内存利用率会降低。【该方法仍然需要每次都要全局锁。】
  3. 将该部分内存放入SG info中统计。


细节:TsFile什么时候关闭?

解法:指定TsFile的大小;或者由于SGInfo中Chunk_metadata过大导致刷磁盘时进行关闭。


在该方案中,只有图中红色部分是全局锁(ArrayPool, SystemInfo,reject信号量)


详细计算公式:


    • WAL buffer: 一个存储组固定大小的buffer:b(永久)
    • PrimitiveArrayPool 原始类型数组缓存(永久)
      • memtable 写入数据使用
      • memtable 排序使用
      • String数组每次会清空(现状)。
    • TVListAllocator 中 TVList 对象头缓存(永久)
    • MemtablePool 中 Memtable 对象头缓存(永久)
    • flush 之后在内存中积累的 ChunkMetadata 缓存大小为 K(临时)
    • flush编码后的字节数组:一个Chunk编码后的大小(临时)


每个存储组维护 SGinfo: G

    • G=C + U + B + b (ChunkMetadata + UnsealedResources + Bytes[] + WAL)
    • 当前写入plan在该SG下的数据总内存占用为 T

当前注册的时间序列数量 N


ArrayPool维护 ArrayPoolInfo:A

    • Buffered Array 内存 B,已用B_u
    • OOP内存 O
    • A=B_u+O

系统级别维护SystemInfo:S

    • S=A+sum(G
  1. 若写入正常,开始向working memtable中写入数据,当memtable中Array空间不足时,向Array Pool申请新的Array。array pool判断是否有已向系统报备过的该类型array(即Buffered array)
    1. 如果Buffered arrayArray Pool申请Buffered array并写入数据;
    2. 如果没有,则需要申请OOB(out of buffered)的数组

SG个数:M

系统为delay上报预留的内存大小:R

SG上报阈值: R/M


条件:

    • available array:所有measurement都在memtable中有,且空间足够;
    • available buffered array:B > B_u && 相应数据类型的array存在;
    • 左下的update system info:G的增量大于R/M。
    • 生成reject:S>=可写入内存(或达到一定比例,如90%)
    • 左边的call for flush: S >=可写入内存*比例 (如50%)


当一个SG被拆分成多个时间分区时,将上文中的SG改为TSP。


当String类型的flush后,更新tsp中对应的内存统计量并上报给System

TsFile封口时,更新ChunkMetadata和Resource的内存统计并上报给System


文档原链接: https://shimo.im/docs/CWxXTDhvkRrHvXPx

  • No labels

1 Comment

  1. Controlling total memory usage by classifying the usage seems very neat. But it has harder time to adapt to dynamic work load and situations. There is only one key on memory control, total memory usage. The mem usage recording by category is necessary because when things goes wrong, IoTDB could identify and report mem usage on each category.

    Restricting time series registration might be too restrictive. Besides, the constant 300 should be the size of metadata of single time series, I suppose. *0.1 might be adjustable, or be emerged with memtable sizes for total memory control.