This document introduces the topology of major classes in the cluster module, i.e., how they are connected and their main functionality. The main purpose of the document is to help newcomers of the cluster module grasp how each class works, so they would be able to find where to modify if they want to contribute their code. However, any other advice or questions about the organization are welcomed, you may comment within this page or send mail to our mail list to discuss.


General Topology

Fig.1 shows the class diagram of important classes in the cluster module. For simplicity, class members and class methods are omitted here as they are just too many. Helper classes and utility classes are also ignored, so you would find many more classes in the cluster module.

Figure.1 Overall Class Topology

Class Functionalities

Program Entry Point

ClusterMain is the main class in the cluster module, which performs some developer-level customization and start a MetaClusterServer, which will further initialize the Raft logics and the underlying IoTDB.

Coordinator :  route database operations to the responsible DataMember with the help of its partition table.

Servers

Servers are implementations of Thrift RPC servers, their main concerns are network transport and protocol parsing. Each of them listens on a port and an IP address, accept connection from clients, wrap connections with certain types of Thrift transports, read requests from the transports, parse them with specific protocols, and send the requests to associated RaftMembers. Each node only has one each server.

  • MetaClusterServer: listens to internal_meta_port, receives MetaGroup-related Raft requests, and forwards them to MetaMember. It also starts the underlying IoTDB once it is up.
  • MetaHeartbeatServer: listens to internal_meta_port + 1, receives MetaGroup heartbeats, and forwards them to MetaMember.
  • DataClusterServer: listens to internal_data_port, receives DataGroup-related Raft requests, decides which DataMember will process it, and forwards to it.
  • DataHeartbeatServer: listens to internal_data_port + 1, only receives DataGroup heartbeats, decides which DataMember will process it, and forwards to it.
  • ClientClusterServer: listens to external_client_port, receives database operations from clients, and forwards them to MetaMember.

Raft Members

Raft Members are implementations and extensions of the Raft algorithm, which handles log generation, log replication, heartbeat, election, and catch-up of falling behind nodes. Each RaftMember plays one role in exactly one Raft group. One cluster node has exactly one MetaMember and multiple DataMembers, depending on the number of replications and partitioning strategy.

  • RaftMember: an abstraction of common Raft procedures, like log generation, log replication, heartbeat, election, and catch-up of falling behind nodes.
  • MetaMember: an extension of RaftMember, representing a member in the MetaGroup, where cluster configuration, authentication info, and storage groups are managed. It holds the partition table, processes operations that may modify the partition table. One node has only one MetaMember.
  • DataMember: an extension of RaftMember, representing a member in one DataGroup, where timeseries schemas and timeseries data of one cluster-level partition are managed. It also handles data queries and thus manages query resources. One node may have multiple DataMembers, depending on how many data partition it manages, which is further decided by the number of replications and partitioning strategy.

Partition Table

Partition Table maintains the mapping from a partition key (storage group name and time partition id in IoTDB) to a DataMember(or a node). As timeseries schemas and data are replicated only in a subset of the cluster (i.e., a DataGroup), operations involving them can only be applied to specific nodes. When a node receives an operation, and it does not manage the corresponding data or schemas of the operation, the node must forward it to the right node, and this is where a Partition Table applies.

  • SlotPartitionTable: a consistent-slot-hashing-based Partition Table, which divides the whole data into a fixed number of slots (10000 slots by default), distributes the slots evenly to all DataGroups, hashes the partition key to one of the slots, and the data holder will be the holder of the slot. Upon the DataGroups are updated, the slots are re-distributed and the changes are recorded for data migration.

HeartbeatThread

Heartbeat Thread is an important component of a Raft Member. As the character of a Raft Member is always one of LEADER, FOLLOWER, or ELECTOR(CANDIDATE), it utilizes Heartbeat Thread in three ways: as a LEADER, Heartbeat Thread sends heartbeats to followers; as a FOLLOWER, Heartbeat Thread periodically checks whether the leader has timed out; as a CANDIDATE, it starts elections and send election requests to other nodes until one of the nodes becomes a valid leader.

  • HeartbeatThread: an abstraction of heartbeat-related transactions and each RaftMember will have its own HeartbeatThread. When the associated member is a LEADER, the thread sends a heartbeat to each follower periodically; if the member is a FOLLOWER, it checks the timestamp when the last heartbeat was received, and decide whether the leader has timed out; otherwise, the member is an elector, the thread will start an indefinite election loop, asks for votes from other members in this Raft group, and break from the loop once itself or another node has become a leader.
  • MetaHeartbeatThread: an extension of HeartbeatThread for MetaMember. It requires the follower to generate a new identifier if its identifier conflicts with a known node during the cluster-building phase. It also sends followers to the partition table if it is required.
  • DataHeartbeatThread: an extension of HeartbeatThread for DataMember. It includes the header node of its group so the receiver knows which group this heartbeat is from, and it includes the log progress of its associated MetaMember to ensure the leader of a data group also has an up-to-date partition table.

RaftLogManager

RaftLogManager provides interfaces to append, commit, and get Raft logs, controls how Raft logs are organized in memory and persisted on disks when to apply committed logs to the underlying IoTDB, and how to generate a snapshot before a certain log index. As each Raft group keeps its own log, each RaftMember has its own RaftLogManager.

  • RaftLogManager: a basic RaftLogManager, which manages the latest logs in memory and colder logs on disks, providing interfaces such as append, commit, and get. It applies logs after they are committed, but how they are applied depends on the passed LogApplier. It provides no snapshot implementation, so sub-classes extending it should focus on their own definition of snapshots.
  • MetaSingleSnapshotLogManager: an extension of RaftLogManager for MetaMembers, which provides MetaSimpleSnapshot containing storage groups, TTLs, users, roles, and partition tables.
  • FilePartitionedSnapshotLogManager: an extension of RaftLogManager for DataMembers, which provides PartitionedSnapshot<FileSnapshot>, consisting of a map whose keys are slot numbers, and values are FileSnapshots. FileSnapshot contains timeseries schemas and hardlink TsFiles (in the form of RemoteTsFileResource) of a slot, which are exactly what IoTDB holds in a slot.


Workflows

Below are some important workflows involved in the cluster module, including cluster setup, election, and data ingestion or query. Only non-exceptional flows are shown currently, because almost all exceptions will be thrown directly to the upper level,  converted to erroneous status codes, and finally returned to the clients. Some unimportant returns are omitted to reduce the number of lines and keep the figure clear.

Start-up(leader)

The procedure of the start-up of a MetaGroup leader is shown in fig.2. Please notice that DataMembers are activated by MetaMembers once it has constructed or received a partition table, and DataMembers perform elections similarly to MetaMembers, so DataMembers are not introduced separately. The procedure is detailed below:

  1. Start-up script `start-node.sh` creates ClusterMain, the program entry point of a cluster node;
  2. ClusterMain creates a MetaClusterServer, which receives MetaGroup RPCs from internal_meta_port;
  3. MetaClusterServer checks its configuration with other nodes, ensuring that at least over half of the nodes have consistent configuration;
  4. MetaClusterServer initializes the underlying IoTDB;
  5. MetaClusterServer creates a MetaMember, which is initialized as an ELECTOR, handles MetaGroup RPCs, and manages a partition table; a Coordinator, which is a coordinator for non-query;
  6. MetaMember tries to load the partition table from the local storage if it exists;
  7. MetaMember creates its MetaHeartbeatThread;
  8. MetaHeartbeatThread sends election requests to other cluster nodes;
  9. The quorum of the MetaGroup agree with the election and send responses to MetaClusterServer;
  10. MetaClusterServer lets MetaMember handle these responses;
  11. MetaMember gathers the responses and confirms that it has become a LEADER, then create a partition table if there is none;
  12. MetaMember creates DataClusterServer, which receives DataGroup RPCs from internal_data_port;
  13. DataClusterServer creates DataMembers depending on the partition table and the number of replications k; k DataMembers will be created, each for a DataGroup the node is in;
  14. DataMembers establish their own DataHeartbeatThreads, and by following similar procedures, they become FOLLOWERs or LEADERs;
  15. MetaMember creates ClientServer, which receives requests from clients, so by now, the node is ready to serve.


Figure.2 Start-up of the MetaGroup leader


Fig.3 demonstrates the start-up of a follower node. Followers and leaders are the same for the first few steps, and the divergence point is that followers fail to gather enough votes for themselves before they receive a heartbeat from a legal leader.

  1. Start-up script `start-node.sh` creates ClusterMain, the program entry point of a cluster node;
  2. ClusterMain creates a MetaClusterServer, which receives MetaGroup RPCs from internal_meta_port;
  3. MetaClusterServer checks its configuration with other nodes, ensuring that at least over half of the nodes have consistent configuration;
  4. MetaClusterServer initializes the underlying IoTDB;
  5. MetaClusterServer creates a MetaMember, which is initialized as an ELECTOR, handles MetaGroup RPCs, and manages a partition table;a Coordinator, which is a coordinator for non-query;
  6. MetaMember tries to load the partition table from the local storage if it exists;
  7. MetaMember creates its MetaHeartbeatThread;
  8. The leader of MetaGroup sends a heartbeat to MetaClusterServer;
  9. MetaClusterServer lets MetaMember handle the heartbeat;
  10. MetaMember becomes a LEADER, then update its partition table from the heartbeat if the heartbeat provides a newer heartbeat;
  11. MetaMember creates DataClusterServer, which receives DataGroup RPCs from internal_data_port;
  12. DataClusterServer creates DataMembers depending on the partition table and the number of replications k; k DataMembers will be created, each for a DataGroup the node is in;
  13. DataMembers establish their own DataHeartbeatThreads, and by following similar procedures, they become FOLLOWERs or LEADERs;
  14. MetaMember creates ClientServer, which receives requests from clients, so by now, the node is ready to serve.

Figure.3 Start-up of the MetaGroup leader

How cluster metadata related operations are executed is explained in Figure.4. Notice that when we say metadata in the context of the cluster module, we mean cluster configurations, storage groups, and authentication info, which may be different from concepts of the stand-alone IoTDB. As the flow is simpler compared with start-up, the leader side, and follower side are put together for the convenience of comparison.

The leader side:

  1. A client sends a request to ClientServer;
  2. ClientServer reads the request of a socket and lets Coordinator handle it;
  3. Coordinator lets MetaMember handle the request;
  4. MetaMember creates a log for the operation and appends it to its RaftLogManager;
  5. MetaMember sends the log to its followers;
  6. When MetaMember gathers enough responses from the followers, it commits the log through its RaftLogManager;
  7. Depending on what the operation is, its RaftLogManager applies the log to the underlying IoTDB or the partition table;
  8. The result of the operation is returned to the client;

The follower side:

  1. The leader sends a log to the follower's MetaClusterServer;
  2. MetaClusterServer parses the log and lets MetaMember handle it;
  3. MetaMember appends the log its RaftLogManager;
  4. MetaMember sends a response to the leader;
  5. The leader sends heartbeats to MetaHeartbeatServer; after committing the log, the heartbeats from the leader will contain a committed log index no less than the log's index;
  6. MetaClusterServer lets MetaMember handle the heartbeat;
  7. On finding the heartbeat contains a newer committed log index, MetaMember commits new logs through its RaftLogManager;
  8. RaftLogManager applies the committed logs to the underlying IoTDB or the partition table accordingly.

Figure.4 Metadata flows

Data in the cluster module means both timeseries schemas and timeseries data since they are partitioned into multiple DataGroups and not stored globally, so coordinators may be necessary to find the right nodes that should store the corresponding data. As the partition table is the sole data structure that may help coordination, its owner, MetaMember, Coordinator need the help of MetaMember. Fig.5 shows the whole procedure involving cluster data related operations.

The coordinator side:

  1. A client sends a request to the coordinator's ClientServer;
  2. ClientServer parses the request and lets Coordinator handle it;
  3. Coordinator routes the request with the help of metaMember;
  4. Coordinator sends the request to the DataGroup(s) that should process it; the request may be split before sending to each DataGroup;
  5. The receivers process the request and return their responses to Coordinator;
  6. Coordinator concludes the results and return it to the client;

The data leader side:

  1. A client sends a request to the leader's ClientServer;
  2. ClientServer parses the request and lets Coordinator handle it;
  3. Coordinator routes the request with the help of metaMember;
  4. Finding out that the node should process the request, MetaMember forwards the request to its DataClusterServer;
  5. DataClusterServer finds the associated DataMember that should process it;
  6. DataMember creates a log for the request and appends it to its RaftLogManager;
  7. DataMember sends the log to other nodes in its DataGroup;
  8. After sending to enough replicas, DataMember commit the log to its RaftLogManager;
  9. RaftLogManager then applies the log to the underlying IoTDB;
  10. The result is returned to the client.

The data follower side:

  1. The data leader sends a log to the follower's DataClusterServer;
  2. DataClusterServer finds the associated DataMember using the header within the request;
  3. DataMember appends the log to its RaftLogManager;
  4. DataMember returns a response to the data leader;
  5. DataHeartbeatServer receives a heartbeat that is after data leader committing the log;
  6. DataClusterServer finds the associated DataMember using the header within the heartbeat;
  7. DataMember commits its logs through its RaftLogManager according to the heartbeat;
  8. RaftLogManager then applies the log to the underlying IoTDB;

Figure.5 Data flows

  • No labels