Introduction

This document introduces how a node manages the status of other nodes in the cluster module. Three major situations involve node status management: down node detection, query coordination, and raft log progress tracking. We will briefly demonstrate the three situations below:

First, nodes in a cluster can become unavailable randomly and unpredictably, due to process crash, machine fall over, network partitioning, or other reasons. Some node unavailability presents as instant exceptions like broken pipeline or connection refused, while some do not throw any instant exceptions and the client must wait until a timeout.  Without proper records of the status of other nodes, one node may keep sending requests to unavailable nodes, causing resource waste and long unnecessary waits. It should be much more efficient to record whether a node is down with confidence and stop sending requests to it if it is considered down.

Besides, when replicas are concerned during query coordination, one of the replicas should be picked up with the best-expected query performance. It is up to the NodeStatusManager to provide the necessary information, like hardware specs and system load, which helps the query coordinator to assess the performance of each replica, and then choose the best of them. Such information can also become stale over time, and NodeStatusManager should also decide when to update the information.

Third, since the Raft algorithm is used to reach consensus, one node also has to store the log progress (last log index) of other nodes in its Raft group when it is a leader, and it will be able to decide whether or not to send the latest logs to a follower or skip it until it has caught up. It is pointless to send the newest logs to a follower that already falls behind, as it cannot append new logs with previous ones missing. It also helps when the leader decides to perform a catch-up because the leader needs to know the start point of the catch-up.

Data Structures

We notice that in the first two situations, the related information is orthogonal to any specific consensus algorithm, while the third situation is bound to the Raft Algorithm exclusively, so we design two data structures for the three situations: NodeStatus for the first two, and Peer for the last one. Although Peer seems less likely to change as the Raft Algorithm is rather fixed, NodeStatus may be modified frequently if we switch between different load-balancing algorithms. Below we will demonstrate each data structure separately.

NodeStatus

As the name suggests, NodeStatus holds the physical status of a node, which further includes:

  • TNodeStatus status: a Thrift structure for information exchange between different nodes; it is currently empty as we have not decided on any specific load-balancing algorithm, and we are not clear what information should be included in this structure, but it is reasonable to assume general status like CPU usage, RAM usage, disk usage, number of currently running queries, number of currently running compactions, will be included.
  • int64 lastUpdateTime: a millisecond timestamp recording when this structure is lastly updated; according to this, NodeStatusManager decides whether or not to update the status of a node by sending a request to ask for the newest status from that node.
  • int64 lastResponseLatency: the latency when the local node lastly requests for a node's current status, in nanoseconds; we currently assign replicas priorities mainly based on this, which means the replica with the lowest latency will be firstly accessed during a query.
  • boolean isActivated: a flag indicating whether a node is reachable; if a node timed out or refused connection, the flag will be set to false, causing consequent connections to this node to fail directly without waiting for another time out or throwing an exception, which is implemented by returning a null client of this node. But some special requests, like heartbeats, configuration checks, and status updates, will ignore this flag and continue to get non-null clients, as one of their functionalities is to discover once a node is back to normal. If any requests to the node succeed, the flag will be set to true so further requests will try to connect the node instead of failing directly.
  • int64 lastDeactivatedTime: a millisecond timestamp recording when isActivated is set to false lastly; there may be no heartbeat between a coordinator and another node because they are not in a leader-follower relationship and if a coordinator deactivates a node, it could be difficult for the coordinator to find out when the node is normal again timely. Although we make a node to send handshakes (heartbeats without any message but the sender), the handshakes may be lost due to a variety of reasons, and the node will consequently be deactivated forever by the coordinator. To avoid this, we also record which time the node is deactivated and if it exceeds 10 minutes from now, the next connections to the node will ignore the flag isActivated. This gives a chance to detect node restart between nodes that have no heartbeats. If the connection succeeds, the flag will be removed and further coordination will be unblocked; otherwise lastDeactivatedTime will be updated and connections will fail directly for another 10 minutes.

NodeStatus is managed as a Map<Node, NodeStatus> by NodeStatusManager, which is a singleton shared by MetaGroupMember and all DataGroupMembers, so if one RaftMember activates/deactivates a node, other members will also be affected.

Peer

Peer stores the Raft-related information of a node, which is only maintained by a leader (peers on a follower node are not used), including:

  • int64 nextIndex: this field is deprecated and may be removed;
  • int64 matchIndex: the index of the last log that has been confirmed existing on both the local node and the associated node; when appending logs to followers, the leader will wait if one follower's matchIndex is too far away from the current sending log, avoiding too many concurrent requests on the follower side; during catch-ups, matchIndex helps to find the range of logs needed to be sent to a follower;
  • AtomicInt32 inconsistentHeartbeatNum: a counter that increases when the heartbeat response from a follower has a smaller last log index than the local one and it is the same as lastHeartbeatIndex and resets when it has the same last log index as the local one or it is different from lastHeartbeatIndex; when it reaches a certain number (5 by default), a catch-up is triggered;
  • int64 lastHeartbeatIndex: the last log index from the last heartbeat; through this, the leader may know whether the progress of a follower is pushing on or has stopped, and triggers a catch-up if the follower has stopped for several heartbeats.

Peer is stored as a Map<Node, Peer> in each RaftMember, as each RaftGroup has its own logs and the progress of a node may differ in different groups.


Status Update

NodeStatus

Except for isActivated and lastDeactivatedTime, which will be introduced later in the "Node Activation" section, other fields in NodeStatus mainly serve query coordination, so they will be updated by queries on demand. When a query is to choose one from the available replicas, it checks its lastUpdateTime, and if it is beyond 60 seconds ago, a request will be sent to that node to fetch its current status and overwrite the one in NodeStatus. After that, the latency of fetching status will be recorded as lastResponseLatency, and the current system time will overwrite lastUpdateTime.

Peer

Peer records the log progress of a follower known by a leader, so it is updated whenever the follower receives new logs. When an AppendEntry/AppendEntries request succeeds, which can be issued by a new log or a catch-up task, the peer's matchIndex is updated to the index of the last log sent by the request, if the matchIndex is less than the last log's.

Peer also records some heartbeat-related status to help the leader decide whether to start a catch-up or not, so it is updated after each heartbeat. If the reported last log index in the heartbeat response is different from the recorded lastHeartbeatIndex, indicating that the follower is catching up by itself or a previously issued catch-up, lastHeartbeatIndex will be updated to the index in the response, and inconsistentHeartbeatNum is reset to zero; otherwise, the two indices are the same, if the indices are also less than the local last log index, then the follower was stale and did not make any progress during the last heartbeat interval, so inconsistentHeartbeatNum is increased by one, and if it reaches a threshold (currently 5), a catch-up is started if no previous one is on-going.


Node Activation

As mentioned above, we use the field NodeStatus.isActivated to indicate whether another node is available from the prospect of one node. This field is initialized as true, so a node always assumes other nodes are alive unless it finds that some nodes are unavailable. As we use Thrift for network communications, Thrift RPC clients, which send RPC requests and wait for responses, are the first to know whether a node is down or up. If the clients can successfully establish connections and read responses from another node, then the node is activated when Client.onComplete() is called. Otherwise, there must be a connection error or a time out so Client.onError() will be invoked, and then the node is deactivated. Besides, when a node restarts, it will send handshakes (messages that only contain who the sender is) to all other nodes, so when they work as coordinators, they can reactivate the node and consider redirecting requests to the newly restarted node. Fig.1 demonstrates the transition, which is mainly controlled by the invocation of clients.

Fig.1 Node activation transitions

When a node is deactivated, RPC clients are not available from client pools (null will be returned directly) for some actions like log appending, query, and catch-up, making them fail directly without waiting for another timeout. However, other actions like heartbeat and status update are not controlled by this flag, because we rely on such actions to detect as soon as possible once a node returns to the cluster, and since the actions are performed background, users will not sense the failures or timeouts caused by sending requests to a node that may be already unavailable. Fig.2 shows how this flag works in a straight-forward way.

Fig2. Node activation controls RPC client retrieval

  • No labels