THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- ClientRequestHandler: TSServiceImpl,接收用户的rpc请求
- Planner: 将请求转为物理计划
- ClusterPlanRouter: 负责路由物理计划的模块,与ConfigNode进行交互,持有数据分区缓存
- Coordinator: 每个DataNode都持有的核心中转模块,处理并转发读写请求,并且内部实现集群内部节点通信的rpc方法
- MetaPuller: 远程拉取元数据的模块,与configNode和MManager交互,持有元数据分区缓存远程拉取元数据的模块,与configNode和SchemaRegion交互,持有元数据分区缓存
- ClusterPlanExecutor: 物理计划执行器,与共识模块交互
- StorageEngine: 本地的存储引擎
...
- 客户端C连接集群内任意DataNode A发送写入请求
- 如果A的ClientRequestHandler接收到写入请求,使用planner转为物理计划InsertRowPlan,传递给Coordinator处理
- A在Coordinator中使用ClusterPlanRouter对写入请求进行路由
- ClusterPlanRouter根据device和时间戳计算出对应的数据分片
- 本地没有当前写入数据的分区表缓存,向ConfigNode共识组中的节点N发送数据分片(存储组,设备组,时间分片编号)
- 找到此数据对应的数据分区
- 找到此数据分区对应的 List<VSG List<DataRegion 共识组>,选择最后一个作为写入的 VSGDataRegion
- 如果此数据分区未分配 VSG DataRegion 共识组,则分配一个 VSG DataRegion 共识组
- 找到此 VSG DataRegion 共识组对应的一组 DataNode 并返回
- 如果此 VSG DataRegion 共识组没有分配 DataNode,则分配一组 DataNode,并在 DataNode 上初始化共识组
- A缓存数据分区的信息
- 根据路由结果,将写入的物理计划转发给B,并携带VSG共识组信息根据路由结果,将写入的物理计划转发给B,并携带DataRegion共识组信息
- B的ClusterRequestHandler接收来自A的物理计划,转给Coordinator处理
- B在Coordinator传递给本地VSG执行B在Coordinator传递给本地DataRegion执行
- DataRegion若发现本地没有写入序列的元数据,则检查此次写入的序列是否已经在集群注册VSG若发现本地没有写入序列的元数据,则检查此次写入的序列是否已经在集群注册
- 首先B通过MetaPuller去远程拉去元数据
- MetaPuller检查本地是否有当前存储组的元数据分区表缓存,若没有则向 ConfigNode 共识组发请求,查找写入点的元数据所在的 MManager SchemaRegion 及所在的 DataNode D
- B 缓存分区信息,并根据该信息将写入的序列及推断类型发送至 MManager缓存分区信息,并根据该信息将写入的序列及推断类型发送至SchemaRegion
- 如果 SchemaRegion 不存在此序列,将这些序列注册到 SchemaRegion(自动注册schema)
- 如果 MManager 不存在此序列,将这些序列注册到 MManager(自动注册schema)
- 如果存在,进行数据类型校验(存在且一致、存在且不一致),接着返回数据类型,编码等序列级别的元信息
- B根据MManager B根据 SchemaRegion 返回的信息决定抛错或者在本地注册不存在序列的元数据
- 若元数据拉取成功,B在vsg中缓存元数据若元数据拉取成功,B在DataRegion中缓存元数据
- 首先B通过MetaPuller去远程拉去元数据
- B将请求发送给共识模块,调用consensusLayer中的write方法,写入成功后返回
...
- 客户端C连接集群内任意DataNode A发送写入请求
- 如果A的ClientRequestHandler接收到写入请求,使用planner转为物理计划InsertRowsPlan,传递给Coordinator处理
- A在Coordinator中使用ClusterPlanRouter对写入请求进行路由
- ClusterPlanRouter根据device列表和时间戳列表计算出对应的所有数据分片
- 本地没有当前写入数据的分区表缓存,向ConfigNode共识组中的节点N发送数据分片(存储组,设备组,时间分片编号)
- 找到此数据对应的数据分区
- 找到此数据分区对应的 List<VSG List<DataRegion 共识组>,选择最后一个作为写入的 VSGDataRegion
- 如果此数据分区未分配 VSG DataRegion 共识组,则分配一个 VSG DataRegion 共识组
- 找到此 VSG DataRegion 共识组对应的一组 DataNode 并返回
- 如果此 VSG DataRegion 共识组没有分配 DataNode,则分配一组 DataNode,并在 DataNode 上初始化共识组
- A缓存数据分区的信息
- 根据数据分区信息,以将insertRows请求切分成对应每个vsg的insertRows子请求,即Map<vsg根据数据分区信息,以将insertRows请求切分成对应每个DataRegion的insertRows子请求,即Map<DataRegion, InsertRows>
- 根据路由结果,分发属于对应vsg的子请求,以节点B为例根据路由结果,分发属于对应DataRegion的子请求,以节点B为例
- B的ClusterRequestHandler接收来自A的物理计划,转给Coordinator处理
- B在Coordinator传递给本地VSG执行B在Coordinator传递给本地DataRegion执行
- DataRegion若发现本地没有写入序列的元数据,则检查此次写入的序列是否已经在集群注册VSG若发现本地没有写入序列的元数据,则检查此次写入的序列是否已经在集群注册
- 首先B通过MetaPuller去远程拉去元数据
- MetaPuller检查本地是否有当前存储组的元数据分区表缓存,若没有则向 ConfigNode 共识组发请求,查找写入点的元数据所在的 MManager SchemaRegion 及所在的 DataNode D
- B 缓存分区信息,并根据该信息将写入的序列及推断类型发送至 MManagerSchemaRegion
- 如果 MManager SchemaRegion 不存在此序列,将这些序列注册到 MManager(自动注册schema)SchemaRegion(自动注册schema)
- 如果存在,进行数据类型校验(存在且一致、存在且不一致),接着返回数据类型,编码等序列级别的元信息
- B根据MManager B根据SchemaRegion 返回的信息决定抛错或者在本地注册不存在序列的元数据
- 若元数据拉取成功,B在vsg中缓存元数据若元数据拉取成功,B在DataRegion中缓存元数据
- 首先B通过MetaPuller去远程拉去元数据
- B将请求发送给共识模块,调用consensusLayer中的write方法,写入成功后返回
...
该接口用于同一个设备下的多行写入,相比于InsertRows,减少了devicePath的重复网络传输开销,写入不具有原子性。
执行流程按照批量写入流程执行。
自动创建元数据
- DataNode向MManager拉取元数据(设备全路径)DataNode向SchemaRegion拉取元数据(设备全路径)
- MManager在共识组内同步(MManager共识组强一致)SchemaRegion在共识组内同步(SchemaRegion共识组强一致)
- MManager检查设备SchemaRegion检查设备/时间序列的schema是否存在,若不存在,在MManager共识组内创建时间序列(CreateTimeSeriesPlan)向DataNode返回时间序列的schema是否存在,若不存在,在SchemaRegion共识组内创建时间序列(CreateTimeSeriesPlan)
- 向DataNode返回
Coordinator接口定义
Code Block | ||
---|---|---|
| ||
public class DataNodeServiceImpl {
private ServiceProvider ServiceProvider;
private Coordinator coordinator;
public TSStatus executeNonQueryPlan(PhysicalPlan plan, ConsensusGroup groupId);
TSStatus createSchemaRegion(List<Integer> DataNodeID, int schemaRegionID);
TSStatus createDataRegion(List<Integer> DataNodeID, int dataRegionID);
TSStatus createDataPartition(List<Integer> DataNodeID, int dataRegionID, long timeInterval);
TSStatus migrateSchemaRegion(int from, int to, int schemaRegionID);
TSStatus migrateDataRegion(int from, int to, int dataRegionID);
TSStatus migrateDataPartition(); // 碎片清扫时迁移数据分区
// TODO: 查询接口调用
TSStatus executeQueryPlan(Physical plan, ConsensusGroup groupId);
//TODO: 增删节点流程
TSStatus addDataNode(AddDataNodeRequest request);
TSStatus removeDataNode(RemoveDataNodeRequest request);
} |
缓存相关接口
ConfigNode
ClusterRequestHandler
...