The discussion is from this page: Chinese

In the discussion, we summarize some drawbacks of the current cluster module and propose some ideas.

We welcome all contributors to supply the design (add comments to reply if you have no permission).

We will move to the next step after drawing all classes structures and relationships.

IM discussion (Slack: cluster channel): https://join.slack.com/t/apacheiotdb/shared_invite/zt-qvso1nj8-7715TpySZtZqmyG5qXQwpg

Current issues and Solutions

  • raft’s linearization serialization should keep consistent with the single module's linearization.
  • Data partitioning (how to partition data, and how to assign the data blocks to different nodes)
  • Data partitioning granularity: SG + time range.
  • Metrics in cluster
  • Scale-out can be only executed one by one
  • There are many synchronized keywords
  • How to partition the schema -> keep the current status
  • Current Raft and business codes are coupled -> decouple
  • There are too many thread pools -> SEDA
  • meta group covers all the nodes, which makes it impossible to scale to hundreds of nodes -> reduce to several nodes
  • Query framework -> MPP

New Design

1. Rules

  • The number of involved Replica Rule:
    • Write operation: quorum
    • Delete (metadata) operation: All nodes
      • This means the Deletion operation has lower availability.
    • Query operation: one node or two (sync with the leader)
  • For all operations, once a node forwards incorrect, the receiver reply incorrect.
    • But as current we can not guarantee all raft groups have the same leader in a data partition, we have to add a forward stage if the receiver is a follower. But if the receiver does not belong to the group, reply incorrection.

2. Roles

There are three types of managers in the cluster. Each node could exist three types of managers.

  • Partition Table managers
    • Several nodes in the cluster. All other nodes must know them (from the configuration file).
    • Knows how the schema is partitioned, and how the data is partitioned.
  • Partitioned Schema managers
    • Several nodes in the cluster.
    • Each Partitioned Schema has a raft group.
  • Partitioned Data managers
    • Each partitioned data has several raft groups (= the SG * virtual nodes in the partitioned table).

3. Module

  • Partition table manager: manage data/schema partition table, provide service about partition table
  • Client RPC: receive requests(read/write * data/schema)
  • Coordinator: route requests to data/schema group according to partition table
  • PlanExecutor: handle requests, for write request, duplicate request by Raft module and execute the write. For query, split plan and route sub plans to related data group and merge result.
  • Raft: log replication
  • StorageEngine: manage local write request and data query
  • QueryExecutor: manage local query request
  • MManager: manage schema info
  • Node RPC: Internal RPC module

4. Process

4.1 Create SG

  • 1. The coordinator checks whether the PT(Partition Table) cache has the SG(Storage Group)
  • 2. if already have, then reject.
  • 3. if not, then send it to the PT raft group
  • 4. after success, update local PT cache
  • This means all other nodes do not have the info in their PT cache.

4.2 Delete SG

  • The coordinator sends to the PT raft group
  • PT group call raft protocol (to start operation and get approval).
  • PT group sends to ALL NODES
  • for each node, delete data, schema,  update local PT cache
  • Once all nodes succeed, PT call raft protocol to finish deleting SG
  • Once some nodes fail, PT call raft protocol to stop deleting SG (means some nodes still have the SG, schema, and data)

When restarting, if the raft group has started operation but has no stop/finish operation, call stop.

4.3 Query  a given SG

  • 1.coordinator checks whether PT cache has the SG
  • 2.if already have, then return.
  • 3.if not, then sends to PT group
  • 4.after success, update local PT cache


4.4 Query fuzzy SGs (“fuzzy” means paths that contains *)

  • 1.coordinator sends request to PT group
  • 2.after success, may update local PT cache

4.5  Create TS

  • 1.coordinator checks whether local schema has the TS
  • 2.if already have, then reject.
  • 3.if not, then searches PT cache (and may pull new cache)  and sends to corresponding data partition/group leader

This means all other nodes do not have the info in their PT cache.

4.6 Delete TS

  • 1.coordinator checks PT cache (and may pull new cache)
  • 2.coordinator sends to corresponding data group leader
  • 3.leader forward to ALL NODES
  • 4.for each node, delete data, mtree
  • Once all nodes success, PT call raft protocol to finish deleting TS
  • Once some nodes fail, PT call raft protocol to stop deleting TS (means some nodes still have the TS, and data)

When restarting, if raft group has start operation but have no stop/finish operation, call stop.

4.7 Query  given TSs (Schema)

  • 1.coordinator checks whether MManager has the TSs
  • 2.if already have, then return.
  • 3.if not, then query PT cache (and may pull cache), and sends to data group node (maybe not the leader)
  • 4.after success, update MManager

4.8  Query  fuzzy TSs ( “fuzzy” means paths that contains *)

  • 1.query PT cache (and may pull cache), and sends to data group node (maybe not the leader)
  • 2.after success, may update MManager

4.9 Write

  • 1.coordinator checks whether PT cache can process the request (and may pull cache)
  • 1.if not exist, then pull cache attached a update request to PT Group.
  • 2.if the SG exists in the PT but the time range does not exist, then need to send update to PT Group.
  • 2.sends to corresponding data partition (not group) leader, i.e., Node B in the figure.
  • 3.Tips: A data partition may have  several raft groups (= SG * virtual SG, now)
  • 4.for each operation in B, check whether B is its corresponding raft group leader, if not, forward to the leader (this is just a temporary solution, will redesign finally)
  • 5.For each operation received by the raft group leader (may B, or C …), if it has no schema locally, then forward “Create TS” to corresponding data partition/group leader.
    (May receives “already exist”, which means this node begins to process a new time range of the TS. May receives “create success”, which means this TS is new.  Update local Mmanager. 
  • 6.call raft for replica.
  • 7.execute.

4.10 Query given TSs

  • 1.coordinator checks whether PT cache can process the request (and may pull cache)
    • 1.if not exist, then pull cache attached a update request to PT Group.
    • 2.if the SG exists in the PT but the time range does not exist, then need to send update to PT Group.
  • 2.sends to corresponding data partition (not group) leader (if weak consistency, follower is also ok), i.e., Node B in the figure.
  • 3.Tips: A data partition may have  several raft groups (= SG * virtual SG, now)
  • 4.for each operation in B, check whether to call raft replica according to the consistency
  • 5.Run locally. 


4.11 Query fuzzy TSs

  • 1.coordinator checks whether PT cache can process the request
    • 1.if not, then the PT group returns the result, and the coordinator pulls cache.
    • 2.if ok, then the coordinator calculate the result locally.
  • 2.sends to corresponding data partition (not group) leader (if weak consistency, follower is also ok), i.e., Node B in the figure.
  • 3.Tips: A data partition may have  several raft groups (= SG * virtual SG, now)
  • 4.for each operation in B, check whether to call raft replica according to the consistency
  • 5.Run locally. 
  • No labels

8 Comments

  1. Xiangdong Huang Last week we were more focus on the module architectures and roles in the refactor plan, but it seemed that we didn't pay much attention on the service architecture.
    As we decide to separate with different roles to IoTDB, what's the advantages and disadvantages between deploying a role per process and a role per thread? Maybe we can consider these:

    1. Resource control: In a very large cluster, most of the communication between different roles are based on RPC, which is more likely a microservice architecture. If we use process model(a role per process), it's easy to control the resource it used. But if we mix them in a single process, the whole process may be inflected by one role with heavy workload, and we should pay much attention to the heap usage which will be shared with different roles.
    2. Scaling: One role per process is easy to scale, especially in K8S environment. We only care about each role performance isolated without mixing them. But if we use thread model, we should ask ourselves: this node's workload is heavy, so which role is the root cause? Here we will need so many monitor metrics and online profiling to get the result.
    3. Coding: Using process model can avoid many in-process memory sharing, which leads to many synchronized control between threads, which leads to potential, hard to solved bugs.

    I'm preferred the process model than thread model.

    1. Hi, I agree to the viewpoint. 

      the drawback is, it may rise the difficulty of deployment.

      Separating the PT role with others seems fine, and it mainly communicate with others by RPC, and do not need all other modules of IoTDB, like StorageEngine, MManager, etc. So the cost is low.


      But if we separate the Schema and data roles, then the additional cost seems too large, as both of them have MManager/mtrees, which is not a small structure in memory and on disk.

      1. In my opinion, if the goal of our architecture reconstruction is to support clusters with more than 100 nodes, the difficulty of deployment should be solved by providing tools, rather than requiring users to write scripts and manually configure them. In this case, it doesn't matter how hard it is to deploy, because we who develop the new architecture will support how to deploy it. Here's PingCAP's cluster deployment tool TiUP, which I think should be a model for us to follow.

      2. I thought there are three roles in the cluster: data manager, schema manager, and partition table manager.

        Data managers have: (1) data (2) schema cache (3) partition table cache

        Schema managers have: (1) schema (2) partition table cache

        Partition table managers have: (1) partition table

        The partition table is shared by all roles, and the schema is shared by two roles.

        So if the partition table could be a separate process, schema could also be...

    2. The big question between process model and thread mode is the efficiency, rather than deployment. In thread model, the comm and sharing are in memory; in process model, they are through RPC, which is a lot less efficient, which can easily cause delays. PT/Schema sharing control is needed any way even in a separate process due to multi-thread support in that process. Besides, what data node needs from PT/Schema managers are basically reading, no writing.

      Theoretically speaking, all the thread pools can be spread into an independent process for the sake of SOA. But the manageability and efficiency are jeopardized. Caution!

  2. 1. Excessive introduction of cache reduces effective memory utilization and increases the complexity of work, which is more likely to cause uncertain bugs.
    2. According to the current process, I think PT can be separated into a separate service and manage all requests, which is easier to maintain.
    3. If you want to preserve the peer-to-peer design, can you divide it into different groups and maintain all the metadata it needs to manage in each group so that all related requests can be completed in the specified group


    1. 1. But if we do not use cache, there may be too many RPCs.

      2. +1

      3.  "divide it into different groups and maintain all the metadata it needs to manage in each group so that all related requests can be completed in the specified group" independent with "peer-to-peer". 

      Current architecture does not consider   "divide it into different groups" (or, call it multi-regions).

      Need to consider how to set a potential interface here and do it in the future. 
      Otherwise we need to consider the developing workload carefully.

  3. We may have to synchronize the partition table with PT roles,

    as if the coordiantor's pt cache is stale, then the coordinator can not split a plan into correct plans...

    Do we want to provide such a strong consistency?