方案特点


  • 原生分布式
    • 让 IoTDB 各个模块原生支持分布式,分布式或单机实例均通过这些模块组合而成。
    • 单机只是分布式的特殊情况。
  • 扩展性
    • 支持快速(秒级)增加节点,无需迁移数据
    • 支持新增节点后实时分担写入负载
    • 支持数据读写以及磁盘空间使用的负载均衡
  • 高可用
    • 客户端可自动切换
    • 单节点失效不影响集群服务
  • 可观测
    • 集群内置监控服务



名词汇总


名词类型对应现有的类解释
ConfigNode节点角色不对应类集群配置节点,管理集群节点信息、管理分区信息
DataNode节点角色不对应类数据节点,管理数据、元数据
ConfigManager模块新加分区节点管理者,处理集群内部请求
PartitionTable模块新加分区表结构,包含元数据分区信息和数据分区信息
StorageEngine模块StorageEngine一个进程内的唯一数据存储引擎(单例)
DataRegion模块VirtualStorageGroupProsessor管理一部分数据分区
SchemaEngine模块新加

一个进程内的唯一元数据管理引擎(单例)

SchemaRegion模块SchemaRegion管理一部分元数据分区
StorageGroup元数据存储组,不同存储组的数据物理隔离
DeviceGroup设备管理粒度每个存储组会对应固定个数的设备组,作为管理设备的基本单元,每个设备都会分配到某一个设备组中。



集群角色


ConfigNode:管理数据分区、元数据分区、节点状态信息

DataNode:管理数据和元数据


每个 Node 是一个进程,可将多个进程部署到一台机器上,支持更强的灵活性,可适配云环境部署。

集群架构

主要模块


IoTDB 是原生分布式架构,主要包括以下模块。

  • ConfigManager(分区管理器)
    • 收集节点状态信息、负责分区表的修改、扩缩容、负载均衡
  • PartitionTable(分区信息表)
    • 元数据分区表、数据分区表
  • StorageEngine(存储引擎)
    • 单例结构,内部管理多个 DataRegion。TsFile 数据文件、数据合并、数据同步
  • DataRegion(数据分区)
    • 管理一部分数据分区
  • SchemaEngine(元数据管理引擎)
    • 单例结构,内部管理多个 SchemaRegion 
  • SchemaRegion
    • 管理一部分元数据分区,提供元数据的增、删、查操作
  • Protocol(网络协议层)
    • 包含 RPC、RestAPI、MQTT 等多种协议的实现,将各种网络协议传来的请求转化为统一的数据处理格式
  • ServiceProvider(请求处理层)
    • 接收统一格式的数据处理请求,管理线程的并发模型。管理权限。
  • Planner(执行计划生成器)
    • SQL 解析器、查询计划生成、查询优化,生成 PhysicalPlan
  • QueryExecutor(查询执行器)
    • 原始数据查询、聚合查询等
  • Coordinator(协调器)
    • 接收执行计划,并判断此计划是本地执行还是远端执行。对于写入计划,交给共识模块进行多副本写入。对于查询计划,负责执行计划的拆分、分发读写请求、合并结果集。
  • Consensus(共识层)
    • 管理多个数据副本组,根据一致性级别调度读写请求到对应副本


DataNode 和 ConfigNode 的模块组成如下图所示,DataNode 的 Coordinator 和 Consensus 模块是可插拔的,启动 Standalone 版本时可不启动。



集群模块分布示例


每个存储组对应 集群总核数/副本数个 DataRegion(D) 共识组,并且对应 M 个 SchemaRegion(S) 共识组


S: SchemaRegion

D: DataRegion

C: ConfigManager


ConfigNode

功能:分区表管理者维护节点状态信息、数据分区表、元数据分区表,同时负责集群扩缩容和负载均衡。

节点状态信息

对于每个 DataNode,记录以下信息

  • 静态指标
    • CPU 核数、总内存、磁盘总容量、磁盘类型、网卡最大吞吐
  • 动态指标
    • CPU 信息(负载,利用率)、内存信息(剩余内存)、磁盘 IO 信息(磁盘利用率)、存储空间信息(剩余容量),网络流量负载(网卡当前吞吐)。
    • 数据负载:DataRegion 个数及其中的 Leader 个数,SchemaRegion 个数及其中的 Leader 个数
    • 对于每个 DataRegion:记录序列数量,数据文件个数、大小
    • 对于每个 SchemaRegion:记录序列数量

数据结构:DataNodeID->NodeInfo。(预估单节点的 NodeInfo 在 4KB 之下,因而此架构支持上百节点的节点状态表在 1MB 以内,不会成为瓶颈。)

元数据分片与分配

数据分片与分配

DataNode

管理数据、元数据的写入和查询。

分区表缓存

维护当前用到的分区表缓存,包括数据分区表和元数据分区表,以避免写入频繁访问 ConfigNode。

元数据持久化存储

每个 DataRegion 内部持久化存储其写入数据对应的元数据


作用

  • 数据写入时可在本地进行数据类型检查,避免每次写入访问 ConfigNode 和 SchemaRegion
  • 查询时进行本地序列的存在性检查
    • 场景:查询 d1.s1 时,分区表节点会告知协调者此设备所在的设备组所在的所有节点,其中某些节点可能不存在此序列,通过本地元数据可快速识别,避免访问每个数据文件


客户端

网络通信

节点增删

ConfigNode 增删流程

DataNode 增删流程

读写流程

数据管理:DML

数据写入

MPP 查询框架

元数据管理:DDL

元数据操作

共识层

负载均衡

集群容错机制

集群监控框架

数据导入导出

集群运维工具

待讨论

  • 设备模板的读写流程
  • 客户端与数据节点的【分区表缓存】及【读写流程适配】
  • 节点的后台线程,SEDA模型
  • 心跳,Lease
  • 各模块的改造:StorageGroupProsessor,PlanExecutor,SchemaRegion,PhysicalPlan



代码修改


开发分支:master

开发方式:不影响现有单机功能,新建包进行开发,最后替代老代码。


模块结构:

DataNode :单机模块转化为分布式模块(server module → DataNode module)

存储引擎的适配

去掉 StorageGroupManager,将 VirtualStorageGroupProsessor 交给 StorageEngine 管理

StorageEngine(单机与分布式共用此结构):管理 Map<StringID, DataRegionProcessor>

  • 单机中通过 StorageName 和 DeviceId 计算出 StringID,从而找到对应的 DataRegion。
  • 分布式中根据 DeviceId 从 ConfigNode 查询到对应的 DataRegionID(StringID),发给 DataNode,通过 StorageEngine 获取对应的 DataRegion。


元数据管理引擎的适配

引入 SchemaEngine 类,管理多个 SchemaRegion,数据结构 Map<StringID, SchemaRegion>

  • 单机:每个存储组对应一个 SchemaRegion
  • 分布式:根据 ConfigNode 查询到 SchemaRegion 的 RaftID(StringID),发给 DataNode,通过 SchemaEngine 获取对应的 SchemaRegion 。

请求处理层的适配(当前 master 代码已经符合以下设计)

Protocol:不区分单机与分布式,共用一套实现

ServiceProvider:

  • 单机:实现 StandAloneServiceProvider,将请求(PhysicalPlan)发送给 PlanExecutor(QueryExecutor)执行
  • 分布式:实现 ClusterServiceProvider,将请求(PhysicalPlan)发送给 Coordinator 执行,Coordinator 再与 ConfigNode 和其他 DataNode 交互进行读写。

新增 package: Coordinator

ConfigNode:新增 Module

新增 package:ConfigManager、PartitionTable

Consensus:新增 Module

开发步骤:

第一阶段:ConfigNode(ConfigManager 分区策略、PartitionTable)、Consensus 框架及 Ratis 集成、单机模块修改,集群启动、增加节点,DataRegion 内部元数据缓存管理( SchemaRegion 或 MTree)

第二阶段:写入流程、数据迁移流程,单机查询算子化,监控框架

第三阶段:MPP 查询引擎,ConfigManager 负载均衡策略,Consensus 的 sofa-jraft 集成

协作计划


模块(接口)内容参与贡献者设计定稿原型开发完成测试调优完成
分区管理(ConfigManager)元数据分区策略、元数据负载均衡策略(确定元数据从哪迁移到哪)陈荣钊3.314.315.31
数据分区策略、数据负载均衡策略(确定数据从哪迁移到哪)
共识层(Consensus、Raft)集群扩容、启动流程谭新宇
集群缩容、节点停机
数据迁移流程(负载均衡、缩容会触发数据迁移流程)
Consensus 层读写流程
Raft 读写流程
元数据操作(SchemaRegion)DDL 执行流程(DataNode 内的元数据缓存更新策略,向 ConfigNode 上报统计信息)薛恺丰、江天
数据写入(Coordinator、StorageEngine、DataRegion)写入流程(内存控制、客户端分区信息缓存管理、DataNode分区信息缓存管理)侯昊男、权思屹
查询引擎(Coordinator)查询算子张金瑞、田原、苏宇荣、魏祥威
基于规则的优化器
单机查询适配
分布式调度、执行器
查询内存控制
监控(MetricManager)集群监控框架张洪胤
多租户管理权限管理



资源隔离



  • No labels

2 Comments

  1. 上面 “集群模块分布示例    每个存储组对应 集群总核数/副本数个 VSG 共识组,并且对应 M 个 MManager 共识组” 部分。

    ================================================

    问题1: 这里的描述有是不是有些有问题?我猜它的本意应该是: 数据共识组个数 x 数据副本数 + 可配置的MManager共识组个数 x MManager副本数 = 集群总CPU核数


    问题2: 按CPU核数确定 共识组个数,这样共识组数量是不是太多了? 过多的共识模块instance数量,感觉没必要。

    上来就把共识组个数和CPU核数绑定,感觉不太合理。例如:我们这边1台线上机器一般100核左右,一个5节点3备份的clustet难道要用到 166 个共识组?

    可能的方案:

    1)可以考虑共识组个数改成可配置的 N 更好,将来可以根据测试效果再决定默认值。

    2)如果多个共识组的目的就是打散数据,一个 N节点 M备份的cluster,共识组个数可以是 ,如果实测共识组数量成为系统瓶颈时,可以把这个数字再乘以 K

    1. 很好的建议!我们初期的想法是一个 VSG 分配一个 CPU 核,最大化利用系统资源。现在还不确定这个算法是否合理,可能一个 VSG 用两核或是两个 VSG 用一核这样的搭配效率更高,之后我们会开展实验进行测试的