写入部分组件

DataNode

  • ClientRequestHandler: TSServiceImpl,接收用户的rpc请求
  • Planner: 将请求转为物理计划
  • ClusterPlanRouter: 负责路由物理计划的模块,与ConfigNode进行交互,持有数据分区缓存
  • Coordinator: 每个DataNode都持有的核心中转模块,处理并转发读写请求,并且内部实现集群内部节点通信的rpc方法
  • MetaPuller: 远程拉取元数据的模块,与configNode和SchemaRegion交互,持有元数据分区缓存
  • ClusterPlanExecutor: 物理计划执行器,与共识模块交互
  • StorageEngine: 本地的存储引擎

写入接口

InsertRow

该接口用于写入一个设备中的一行数据(多个物理量),可以写入对齐或非对齐的物理量。

具体流程:

  • 客户端C连接集群内任意DataNode A发送写入请求
  • 如果A的ClientRequestHandler接收到写入请求,使用planner转为物理计划InsertRowPlan,传递给Coordinator处理
  • A在Coordinator中使用ClusterPlanRouter对写入请求进行路由
    • ClusterPlanRouter根据device和时间戳计算出对应的数据分片
    • 本地没有当前写入数据的分区表缓存,向ConfigNode共识组中的节点N发送数据分片(存储组,设备组,时间分片编号)
      • 找到此数据对应的数据分区
      • 找到此数据分区对应的 List<DataRegion 共识组>,选择最后一个作为写入的 DataRegion
        • 如果此数据分区未分配 DataRegion 共识组,则分配一个 DataRegion 共识组
      • 找到此 DataRegion 共识组对应的一组 DataNode 并返回
        • 如果此 DataRegion 共识组没有分配 DataNode,则分配一组 DataNode,并在 DataNode 上初始化共识组
    • A缓存数据分区的信息
  • 根据路由结果,将写入的物理计划转发给B,并携带DataRegion共识组信息
  • B的ClusterRequestHandler接收来自A的物理计划,转给Coordinator处理
  • B在Coordinator传递给本地DataRegion执行
  • DataRegion若发现本地没有写入序列的元数据,则检查此次写入的序列是否已经在集群注册
    • 首先B通过MetaPuller去远程拉去元数据
      • MetaPuller检查本地是否有当前存储组的元数据分区表缓存,若没有则向 ConfigNode 共识组发请求,查找写入点的元数据所在的 SchemaRegion 及所在的 DataNode D
      • B 缓存分区信息,并根据该信息将写入的序列及推断类型发送至SchemaRegion
        • 如果 SchemaRegion 不存在此序列,将这些序列注册到 SchemaRegion(自动注册schema)
        • 如果存在,进行数据类型校验(存在且一致、存在且不一致),接着返回数据类型,编码等序列级别的元信息
      • B根据 SchemaRegion 返回的信息决定抛错或者在本地注册不存在序列的元数据
    • 若元数据拉取成功,B在DataRegion中缓存元数据
  • B将请求发送给共识模块,调用consensusLayer中的write方法,写入成功后返回

InsertRows

该接口用于批量写入,将多个Row请求通过一次rpc来进行执行,写入不具有原子性,在返回结果中包含哪些行被成功写入。

批量写入具体流程:

  • 客户端C连接集群内任意DataNode A发送写入请求
  • 如果A的ClientRequestHandler接收到写入请求,使用planner转为物理计划InsertRowsPlan,传递给Coordinator处理
  • A在Coordinator中使用ClusterPlanRouter对写入请求进行路由
    • ClusterPlanRouter根据device列表和时间戳列表计算出对应的所有数据分片
    • 本地没有当前写入数据的分区表缓存,向ConfigNode共识组中的节点N发送数据分片(存储组,设备组,时间分片编号)
      • 找到此数据对应的数据分区
      • 找到此数据分区对应的 List<DataRegion 共识组>,选择最后一个作为写入的 DataRegion
        • 如果此数据分区未分配 DataRegion 共识组,则分配一个 DataRegion 共识组
      • 找到此 DataRegion 共识组对应的一组 DataNode 并返回
        • 如果此 DataRegion 共识组没有分配 DataNode,则分配一组 DataNode,并在 DataNode 上初始化共识组
    • A缓存数据分区的信息
    • 根据数据分区信息,以将insertRows请求切分成对应每个DataRegion的insertRows子请求,即Map<DataRegion, InsertRows>
  • 根据路由结果,分发属于对应DataRegion的子请求,以节点B为例
  • B的ClusterRequestHandler接收来自A的物理计划,转给Coordinator处理
  • B在Coordinator传递给本地DataRegion执行
  • DataRegion若发现本地没有写入序列的元数据,则检查此次写入的序列是否已经在集群注册
    • 首先B通过MetaPuller去远程拉去元数据
      • MetaPuller检查本地是否有当前存储组的元数据分区表缓存,若没有则向 ConfigNode 共识组发请求,查找写入点的元数据所在的 SchemaRegion 及所在的 DataNode D
      • B 缓存分区信息,并根据该信息将写入的序列及推断类型发送至 SchemaRegion
        • 如果 SchemaRegion 不存在此序列,将这些序列注册到 SchemaRegion(自动注册schema)
        • 如果存在,进行数据类型校验(存在且一致、存在且不一致),接着返回数据类型,编码等序列级别的元信息
      • B根据SchemaRegion 返回的信息决定抛错或者在本地注册不存在序列的元数据
    • 若元数据拉取成功,B在DataRegion中缓存元数据
  • B将请求发送给共识模块,调用consensusLayer中的write方法,写入成功后返回

InsertTablet

该接口用于写入一个设备中的多个数据点,每个数据点包含设备中的物理量以及对应的时间戳和值,推荐使用该方法进行写入,写入不具有原子性。

执行流程按照批量写入流程执行。

InsertMultiTablets

该接口用于批量写入,将多个Tablet请求通过一次rpc来执行,写入不具有原子性,在返回结果中包含哪些行被成功写入。

执行流程按照批量写入流程执行。

InsertRowsOfOneDevice

该接口用于同一个设备下的多行写入,相比于InsertRows,减少了devicePath的重复网络传输开销,写入不具有原子性。

执行流程按照批量写入流程执行。


自动创建元数据

  • DataNode向SchemaRegion拉取元数据(设备全路径)
  • SchemaRegion在共识组内同步(SchemaRegion共识组强一致)
  • SchemaRegion检查设备/时间序列的schema是否存在,若不存在,在SchemaRegion共识组内创建时间序列(CreateTimeSeriesPlan)
  • 向DataNode返回

Coordinator接口定义

public class DataNodeServiceImpl {

	private ServiceProvider ServiceProvider;
	private Coordinator coordinator;

	public TSStatus executeNonQueryPlan(PhysicalPlan plan, ConsensusGroup groupId);

 	TSStatus createSchemaRegion(List<Integer> DataNodeID, int schemaRegionID);

    TSStatus createDataRegion(List<Integer> DataNodeID, int dataRegionID);
     
    TSStatus createDataPartition(List<Integer> DataNodeID, int dataRegionID, long timeInterval);
 
    TSStatus migrateSchemaRegion(int from, int to, int schemaRegionID);
 
    TSStatus migrateDataRegion(int from, int to, int dataRegionID);
 
    TSStatus migrateDataPartition(); // 碎片清扫时迁移数据分区 

	// TODO: 查询接口调用
	TSStatus executeQueryPlan(Physical plan, ConsensusGroup groupId);

	//TODO: 增删节点流程
	TSStatus addDataNode(AddDataNodeRequest request);
	TSStatus removeDataNode(RemoveDataNodeRequest request); 
}


缓存相关接口

ConfigNode

ClusterRequestHandler

DataPartitionResponse PullDataPartitionItem(存储组,设备组,时间片序号)

MetaDataPartitionResponse PullMetaDataPartitionItem(存储组,设备组,时间片序号)

可以进行批量接口的改造

DataNode

ClusterRequestHandler

ClearAllDataPartitionCacheResponse ClearDataPartitionCache()

ClearAllMetaDataPartitionCacheResponse ClearMetaDataPartitionCache()

ClearDataPartitionCacheResponse ClearDataPartitionCache(存储组,设备组,时间片序号)

ClearMetaDataPartitionCacheResponse ClearMetaDataPartitionCache(存储组,设备组,时间片序号)



  • No labels