每个 TsFileProcessor 维护一个 workMemTable 指向处于工作状态的 memtable,同时维护一个 flushingMemtables 队列存储处于待持久化状态的 memtable

持久化的触发条件

每次写数据后都需要检查以下条件,若满足其中之一则调用 TsFileFlushPolicy 的 apply 方法进行持久化

  1. workMemTable 的 shouldFlush 字段被设置为true(内存控制 SystemInfo 会设置此字段)
  2. workMemTable 占用的内存总量超过 memtable_size_threshold 且未开启内存控制(即 enable_mem_control 为 false)
  3. workMemTable 中每条序列的平均数据点超过 avg_series_point_number_threshold,该项检查的目的是为了防止内存中 Chunk 数据点过多造成查询时排序时间过长

乱序 Memtable 的定时持久化

  • 使用场景:当乱序数据点积累较慢时,乱序 memtable 无法及时持久化到磁盘,也因此无法被及时进行跨空间合并,导致查询性能无法及时恢复。
  • 该功能包含三个参数
    • enable_timed_flush_unseq_memtable:是否开启乱序 memtable 的定时持久化功能
    • unseq_memtable_flush_interval_in_ms:乱序 memtable 从创建到持久化的时间间隔(单位:ms)
    • unseq_memtable_flush_check_interval_in_ms:定时持久化的检查间隔(单位:ms)
  • 功能描述:定期检查每个乱序 memtable 是否满足(系统当前时间 - unseq_memtable_flush_interval_in_ms > memtable 的创建时间),若满足则将该 memtable 持久化

持久化管理器 FlushManager

  • memtable 的持久化任务管理器,进行 memtable 的异步持久化

  • 一个 TsFileProcessor 可能对应多个需要持久化的 memtable,但同一时刻每个 TsFileProcessor 最多只有一个 flush 任务执行,目的是防止对同一个 tsfile 进行并发写入

  • 通过 registerTsFileProcessor 方法注册需要持久化的 memtable 所对应的 TsFileProcessor,共有两个注册源
    • TsFileProcessor:在需要 flush 或 close 时注册自己

    • 持久化子线程 FlushThread:在一个 memtable 的持久化任务结束后,再次注册其对应的 TsFileProcessor 来检查是否仍存在其他需要持久化的 memtable

持久化任务 MemtableFlushTask

持久化过程采用流水线的方式,共分为三个阶段、两个任务队列

三个持久化阶段

  • 排序阶段:MemTableFlushTask 所在线程,负责给每个 measurement 对应的 chunk 排序
  • 编码阶段:encodingTask 线程,负责给每个 Chunk 进行编码,编码成字节数组
  • I/O阶段:ioTask 线程,负责将编码好的 Chunk 持久化到磁盘的 TsFile 文件上

两个任务队列

使用两个 ConcurrentLinkedQueue<Object> 作为任务队列,进行线程间交互

  • encodingTaskQueue:排序线程 → 编码线程,包括三种任务

    • StartFlushGroupIOTask:开始持久化一个 device (ChunkGroup), encoding 不处理这个命令,直接发给 IO 线程

    • Pair<TVList, MeasurementSchema>:编码一个 Chunk

    • EndChunkGroupIoTask:结束一个 device (ChunkGroup) 的持久化,encoding 不处理这个命令,直接发给 IO 线程

  • ioTaskQueue:编码线程 → IO线程,包括三种任务

    • StartFlushGroupIOTask:开始持久化一个 device (ChunkGroup)

    • IChunkWriter:持久化一个 Chunk 到磁盘上

    • EndChunkGroupIoTask:结束一个 device (ChunkGroup) 的持久化


  • No labels