Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

Code Block
Replica {                                // a replica of a partition
  broker_id               : int
  partition               : Partition
  log                     : Log          // local log associated with this replica
  hw                      : long         // offset of the last committed message
  leo                     : long         // log end offset
  isLeader                : Boolean      // is this replica leader
}

Partition {                              //a partition in a topic
  topic                   : string
  partition_id            : int
  leader                  : Replica      // the leader replica of this partition
  ISR                     : Set[Replica] // In-sync replica set, maintained at the leader
  AR                      : Set[Replica] // All replicas assigned for this partition
  LeaderAndISRVersionInZK : long         // version id of the LeaderAndISR path; used for conditionally update the LeaderAndISR path in ZK
}



LeaderAndISRCommandLeaderAndISRRequest {
  isInitrequest_type_id         : int16 // the request id
  version_id  :  byte         // whether: thisint16 is// the firstversion commandof issuedthis by a controller
request
  client_id   leaderAndISRMap         : Map[(topic: String, partitionId: int) => LeaderAndISR) int32 // athis mapcan of LeaderAndISR
}


LeaderAndISR {
  leader     be the broker id of the controller
  ack_timeout             : intint32 // the time in ms to wait for a response
 // brokerisInit id of the leader
  leaderGenId             : intbyte  // whether this is the first command issued //by leadera generationcontroller
 id, incrementedleaderAndISRMap on each leadership change
  ISR   : Map[(topic: String, partitionId: int32) => LeaderAndISR) // a map of LeaderAndISR
}


LeaderAndISR {
  leader    : Set[int]                : int32   // a set of the id of each// broker inid ISR
of the zkVersionleader
  leaderEpoc              : int32 long         // versionleader ofepoc, theincremented LeaderAndISRon patheach inleadership ZK
}


LeaderAndISRResponse {
  responseMapchange
  ISR                     : Map[(topic: String, partitionId: int) => int)Set[int32]     // a mapset of the id of each broker errorin code
}

StopReplicaCommand {
  stopReplicaSetISR
  zkVersion               : Set[(topic: String, partitionId: int)) int64          // aversion setof ofthe partitionsLeaderAndISR topath bein stoppedZK
}


StopReplicaResponseLeaderAndISRResponse {
  responseMapversion_id             : Map[(topic: String, partitionId: int) => int) int16 // athe mapversion of error code
}
A. Failover during broker failure.

Controller watches child changes of /brokers/ids path. When the watcher gets triggered, it calls on_broker_change().

Code Block

on_broker_change():
1. Get the current live brokers from BrokerPath in ZK
2. Determine set_p, a set of partitions whose leader is no longer live or whose ISR will change because a broker is down.
3. For each partition P in set_p
3.1 Read the current ISR of P from LeaderAndISR path in ZK
3.2 Determine the new leader and the new ISR of P:
If ISR has at least 1 broker in the live broker list, select one of those brokers as the new leader. The new ISR includes all brokers in the current ISR that are alive.
Otherwise, select one of the live brokers in AR as the new leader and set that broker as the new ISR (potential data loss in this case).
Finally, if none of the brokers in AR is alive, set the new leader to -1.
3.3 Write the new leader, ISR and a new epoc (increase current epoc by 1) in /brokers/topics/[topic]/[partition_id|partition_id]/leaderAndISR.
This write has to be done conditionally. If the version of the LeaderAndISR path has changed btw 1.1 and 1.3, go back to 1.1.
4. Send a LeaderAndISRCommand (containers the new leader/ISR and the ZK version of the LeaderAndISR path) for each partition in set_p to the affected brokers.
For efficiency, we can put multiple commands in one RPC request.
(Ideally we want to use ZK multi to do the reads and writes in step 3.1 and 3.3.)
B. Creating/deleting topics.

The controller watches child change of /brokers/topics. When the watcher gets triggered, it calls on_topic_change().

this request
  responseMap             : Map[(topic: String, partitionId: int32) => int16) // a map of error code
}

StopReplicaRequest {
  request_type_id         : int16 // the request id
  version_id              : int16 // the version of this request
  client_id               : int32 // this can be the broker id of the controller
  ack_timeout             : int32 // the time in ms to wait for a response
  stopReplicaSet          : Set[(topic: String, partitionId: int)) // a set of partitions to be stopped
}


StopReplicaResponse {
  version_id              : int16 // the version of this request
  responseMap             : Map[(topic: String, partitionId: int32) => int16) // a map of error code
}
A. Failover during broker failure.

Controller watches child changes of /brokers/ids path. When the watcher gets triggered, it calls on_broker_change().

Code Block

on_broker_change():
1. Get the current live brokers from BrokerPath in ZK
2. Determine set_p, a set of partitions whose leader is no longer live or whose ISR will change because a broker is down.
3. For each partition P in set_p
3.1 Read the current ISR of P from LeaderAndISR path in ZK
3.2 Determine the new leader and the new ISR of P:
If ISR has at least 1 broker in the live broker list, select one of those brokers as the new leader. The new ISR includes all brokers in the current ISR that are alive.
Otherwise, select one of the live brokers in AR as the new leader and set that broker as the new ISR (potential data loss in this case).
Finally, if none of the brokers in AR is alive, set the new leader to -1.
3.3 Write the new leader, ISR and a new epoc (increase current epoc by 1)
Code Block

on_topic_change():
The controller keeps in memory a list of existing topics.
1. If a new topic is created, read the TopicPath in ZK to get topic's replica assignment.
1.1. call init_leaders() on all newly created partitions.
2. If a topic is deleted, send the StopReplicaCommand to all affected brokers.

init_leaders(set_p):
Input: set_p, a set of partitions
0. Read the current live broker list from the BrokerPath in ZK
1. For each partition P in set_p
1.1 Select one of the live brokers in AR as the new leader and set all live brokers in AR as the new ISR.
1.2 Write the new leader and ISR in /brokers/topics/[topic]/[partition_id|partition_id]/leaderAndISR.
2.This Sendwrite the LeaderAndISRCommandhas to thebe affecteddone brokersconditionally. Again, for efficiency,If the version of the controllerLeaderAndISR path canhas sendchanged multiplebtw commands1.1 inand 1 RPC.
C. Broker acts on commands from the controller.

Each broker listens to commands from the controller through RPC. 

.3, go back to 1.1.
4. Send a LeaderAndISRCommand (containers the new leader/ISR and the ZK version of the LeaderAndISR path) for each partition in set_p to the affected brokers.
For efficiency, we can put multiple commands in one RPC request.
(Ideally we want to use ZK multi to do the reads and writes in step 3.1 and 3.3.)
B. Creating/deleting topics.

The controller watches child change of /brokers/topics. When the watcher gets triggered, it calls on_topic_change().

Code Block

on_topic_change():
The controller keeps in memory a list of existing topics.
1. If a new topic is created, read the TopicPath in ZK to get topic's replica assignment.
1.1. call init_leaders() on all newly created partitions.
2. If a topic is deleted, send the StopReplicaCommand to all affected brokers.

init_leaders(set_p):
Input: set_p, a set of partitions
0. Read the current live broker list from the BrokerPath in ZK
1. For each partition P in set_p
1.1 Select one of the live brokers in AR as the new leader and set all live brokers in AR as the new ISR.
1.2 Write the new leader and ISR in /brokers/topics/[topic]/[partition_id|partition_id]/leaderAndISR
2. Send the LeaderAndISRCommand to the affected brokers. Again, for efficiency, the controller can send multiple commands in 1 RPC.
C. Broker acts on commands from the controller.

Each broker listens to commands from the controller through RPC. 

Code Block

For LeaderAndISRCommand: it calls on_LeaderAndISRCommand().
on_LeaderAndISRCommand(command):
1. Read the set of partitions set_P from command.
2. For each partition P in set_p
2.0 if P doesn't exist locally, call startReplica()
2.1 If the command asks this broker to be the new leader for P and this broker is not already the leader for P,
2.1.1 call becomeLeader()
2.2 If the command asks this broker to following a leader L and the broker is not already following L
2.2.1 call becomeFollower()
3. If the command has a flag INIT, delete all local partitions not in set_p.

becomeLeader(r: Replica, command
Code Block

For LeaderAndISRCommand: it calls on_LeaderAndISRCommand().
on_LeaderAndISRCommand(command):
1. Read the set of partitions set_P from command.
2. For each partition P in set_p
2.0 if P doesn't exist locally, call startReplica()
2.1 If the command asks this broker to be the new leader for P and this broker is not already the leader for P,
2.1.1 call becomeLeader()
2.2 If the command asks this broker to following a leader L and the broker is not already following L
2.2.1 call becomeFollower()
3. If the command has a flag INIT, delete all local partitions not in set_p.

becomeLeader(r: Replica, command) {
stop the ReplicaFetcherThread to the old leader   //after this, no more messages from the old leader can be appended to r  r.leaderAndISRZKVersion = command.leaderAndISRZKVersion
r.partition.ISR = command.ISR
r.isLeader = true                                 //enables reads/writes to this partition on this broker
r.partition.LeaderAndISRVersionInZK = command.LeaderAndISRVersionInZK
start a commit thread on r
}
Note that the new leader's HW could be behind the HW of the previous leader. Therefore, immediately after the leadership transition,
it is possible for a consumer (client) to ask for an offset larger than the new leader's HW. To handle this case, if the consumer is
asking for an offset between the leader's HW and LEO, the broker could just return an empty set. If the requested offset is larger
than LEO, the broker would still return OffsetOutofRangeException.

becomeFollower(r: Replica) {
stop the ReplicaFetcherThread to the old leader   //after this, no more messages from the old leader can be appended to r  r.leaderAndISRZKVersion = command.leaderAndISRZKVersion
r.partition.ISR = command.ISR
r.isLeader = true false                                //disablesenables reads/writes to this partition on this broker
stop the commit thread, if any
truncate the log to r.hw
start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
}

startReplica(r: Replica) {
create the partition directory locally, if not present
start the HW checkpoint thread for r
}



For StopReplicaCommand: it calls on_StopReplicaCommand().
on_StopReplicaCommand(command):
1. Read the list of partitions from command.
2. For each such partition P
2.1 call stopReplica() on p

stopReplica(r: Replica) {
  stop the ReplicaFetcherThread associated with r, if any.
  stop the HW checkpoint thread for r
  delete the partition directory locally, if r.partition.LeaderAndISRVersionInZK = command.LeaderAndISRVersionInZK
start a commit thread on r
}
Note that the new leader's HW could be behind the HW of the previous leader. Therefore, immediately after the leadership transition,
it is possible for a consumer (client) to ask for an offset larger than the new leader's HW. To handle this case, if the consumer is
asking for an offset between the leader's HW and LEO, the broker could just return an empty set. If the requested offset is larger
than LEO, the broker would still return OffsetOutofRangeException.

becomeFollower(r: Replica) {
stop the ReplicaFetcherThread to the old leader  r.isLeader = false                                //disables reads/writes to this partition on this broker
stop the commit thread, if any
truncate the log to r.hw
start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
}

startReplica(r: Replica) {
create the partition directory locally, if not present
start the HW checkpoint thread for r
}



For StopReplicaCommand: it calls on_StopReplicaCommand().
on_StopReplicaCommand(command):
1. Read the list of partitions from command.
2. For each such partition P
2.1 call stopReplica() on p

stopReplica(r: Replica) {
  stop the ReplicaFetcherThread associated with r, if any.
  stop the HW checkpoint thread for r
  delete the partition directory locally, if present
}
D. Handling controller failure.

...

For reference, HBase only updates the metadata (for client routing) after the regionserver responds to close/open region commands. So, one would think that instead of the controller directly updating the LeaderAndISR path, we can let each broker update that path after it completes the execution of the command. There is actually a critical reason that the leaderAndISR path has to be updated by the controller. This is because we rely on the leaderAndISR path in ZK to synchronize between the controller and the leader of a partition. After the controller makes a leadership change decision, it doesn't want the ISR to be changed by the current leader any more. Otherwise, the newly elected leader could be taken out of ISR by the current leader before the new leader takes over the leadership. By publishing the new leader immediately in the leaderAndISR path, the controller prevents the current leader from updating the ISR any more.
One possibility is to use another ZK path ExternalView for client routing. The controller only updates ExternalView after the broker responds positively for the leadership change command. There is a tradeoff between using 1 ExternalView path for all partitions or 1 ExternalView path per partition. The former has less ZK overhead, but potentially forces unnecessary rebalancing on the consumers.Aonther way to think about this is that in the normal case, leadership change commands are executed very quickly. So we probably can just rely on client side retry logic to handle the transition period. In the uncommon case that somehow it takes too long for a broker to become a leader (likely due to a bug), the controller will get a timeout and can trigger an alert so that an admin can take a look at it. So, we probably can start without the ExternalView path and reconsider it if it's really needed.

...

leader before the new leader takes over the leadership. By publishing the new leader immediately in the leaderAndISR path, the controller prevents the current leader from updating the ISR any more.
One possibility is to use another ZK path ExternalView for client routing. The controller only updates ExternalView after the broker responds positively for the leadership change command. There is a tradeoff between using 1 ExternalView path for all partitions or 1 ExternalView path per partition. The former has less ZK overhead, but potentially forces unnecessary rebalancing on the consumers.Aonther way to think about this is that in the normal case, leadership change commands are executed very quickly. So we probably can just rely on client side retry logic to handle the transition period. In the uncommon case that somehow it takes too long for a broker to become a leader (likely due to a bug), the controller will get a timeout and can trigger an alert so that an admin can take a look at it. So, we probably can start without the ExternalView path and reconsider it if it's really needed.

7. Dealing with offsets beyond HW in fetch requests during leadership change:

In general, the HW in the follower always lags that in the leader. So, during a leadership change, a consumer client could be requesting an offset between the new leader's HW and LEO. Normally, the server will return an OffsetOutOfRangeException to the client. In this particular case, the client request is actually valid. To deal with this case, the server can return an empty message set to the consumer if the requested offset is between HW and LEO.

8. Can follower keep up with the leader?

In general, we need to have as much I/O parallelism in the follower as in the leader. Probably need to think a bit more on this.

...