单行数据(一个设备单个时间戳的多个物理量)写入流程


对应的接口

  • JDBC 的 execute 和 executeBatch 接口
  • Session 的 insertRecord 和 insertRecords


总入口:public void insert(InsertRowPlan insertRowPlan)   StorageEngine.java

  • 找到对应的 StorageGroupProcessor
  • 根据写入数据的时间以及当前设备落盘的最后时间戳,找到对应的 TsFileProcessor
  • 写入 TsFileProcessor 对应的 memtable 中
    • 如果是乱序文件,则更新 tsfileResource 中的 endTimeMap
    • 如果 tsfile 中没有该设备的信息,或新插入数据的时间小于已存 startTime,则更新 tsfileResource 中的 startTimeMap
  • 记录写前日志
  • 根据 memtable 大小,来判断是否触发异步持久化 memtable 操作
  • 如果是顺序文件且执行了刷盘动作,则更新 tsfileResource 中的 endTimeMap
  • 根据当前磁盘 TsFile 的大小,判断是否触发文件关闭操作


多行数据(一个设备多个时间戳的多个物理量)写入流程


对应的接口

  • Session 的 insertTablet


总入口:public void insertTablet(InsertTabletPlan insertTabletPlan)  StorageEngine.java

  • 找到对应的 StorageGroupManager
  • 根据设备ID找到对应的虚拟存储组 VirtualStorageGroupProsessor
  • 根据时间分区以及各分区的最大持久化时间戳,将这批数据分成小批,分别对应到一个 TsFileProcessor 中
  • 分别将每小批写入 TsFileProcessor 对应的 memtable 中
    • 如果是乱序文件,则更新 tsfileResource 中的 endTimeMap
    • 如果 tsfile 中没有该设备的信息,或新插入数据的时间小于已存 startTime,则更新 tsfileResource 中的 startTimeMap
  • 记录写前日志
  • 根据 memtable 大小,来判断是否触发异步持久化 memtable 操作
    • 如果是顺序文件且执行了刷盘动作,则更新 tsfileResource 中的 endTimeMap
  • 根据当前磁盘 TsFile 的大小,判断是否触发文件关闭操作


写入顺序图

Memtable 结构

为节省内存空间,不保留device全路径,而是使用其ID作为HashMap的key

// device ID -> (measurement name -> IWritableMemChunk)
Map<DeviceID, <String, IWritableMemChunk>> memTableMap;

StorageGroupProcessor关于每个设备flush时间的四个map

使用ID表保存每个时间序列最新时间分区的最后刷盘时间,需要被替换的已有结构:

// 记录最后写入时间
private Map<Long, Map<String, Long>> latestTimeForEachDevice = new HashMap<>();
  
// 记录最后刷盘时间 
private Map<Long, Map<String, Long>> partitionLatestFlushedTimeForEachDevice = new HashMap<>();
  
// 升级时使用
private Map<Long, Map<String, Long>> newlyFlushedPartitionLatestFlushedTimeForEachDevice = new HashMap<>();
  
// 全局刷盘时间
private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>();
  • No labels