Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
For LeaderAndISRCommand: it calls on_LeaderAndISRCommand().
on_LeaderAndISRCommand(command):
1. Read the set of partitions set_P from command.
2. For each partition P in set_p
2.0 if P doesn't exist locally, call startReplica()
2.1 If the command asks this broker to be the new leader for P and this broker is not already the leader for P,
2.1.1 call becomeLeader()
2.2 If the command asks this broker to following a leader L and the broker is not already following L
2.2.1 call becomeFollower()
3. If the command has a flag INIT, delete all local partitions not in set_p.

becomeLeader(r: Replica, command)
 {
  stop the ReplicaFetcherThread to the old leader   //after this, no more messages from the old leader can be appended to r  r.leaderAndISRZKVersion = command.leaderAndISRZKVersion
  r.ISR = command.ISR   stop the HW checkpoint thread for r
  r.isLeader = true                                 //enables reads/writes to this partition on this broker
  start a commit thread on r
}
Note that the new leader's HW could be behind the HW of the previous leader. Therefore, immediately after the leadership transition,
it is possible for a consumer (client) to ask for an offset larger than the new leader's HW. To handle this case, if the consumer is
asking for an offset between the leader's HW and LEO, the broker could just return an empty set. If the requested offset is larger
than LEO, the broker would still return OffsetOutofRangeException.

becomeFollower(r: Replica)
 {
  stop the ReplicaFetcherThread to the old leader  r.isLeader = false                                //disables reads/writes to this partition on this broker
  stop the commit thread, if any
  truncate the log to r.hw
  start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
}

startReplica(r: Replica)
 {
  create the partition directory locally, if not present
  start the HW checkpoint thread for r
}



For StopReplicaCommand: it calls on_StopReplicaCommand().
on_StopReplicaCommand(command):
1. Read the list of partitions from command.
2. For each such partition P
2.1 call stopReplica() on p

stopReplica(r: Replica) {
  stop the ReplicaFetcherThread associated with r, if any.
  stop the HW checkpoint thread for r
  delete the partition directory locally, if present
}

...

Code Block
Reassignment Agent maintains an in memory map reassigned_partition_map that tracks all partitions to be reassigned.

add_reassigned_partitions(set_p)
1. for each partition p in set_p
1.1  if (p not in reassigned_partition_map)
1.1.1  set p's state to INIT and add p to reassigned_partiton_map
1.1.2  wake up the reassignment agent

on_wakeup()
1. for each partition p in reassigned_partiton_map
2.   if partition p in INIT state
2.1    issue LeaderAndISR command to the brokers in RAR (to start bootstrapping new replicas)
2.2    mark partition p as in IN_PROGRESS state
3.   else if partition p in IN_PROGRESS state
3.1    read the ISR from the LeaderAndISR path in ZK
3.2    if ISR == AR+RAR // this means that all replicas have caught up and we want to switch the current ISR and AR to RAR
3.2.1    get a controller lock
3.2.2    conditionally update ISR in the LeaderAndISR path in ZK (if the version of the path has changed btw 3.1 and 3.2.2, go back to 3.1)
3.2.3    send a LeaderAndISR command to inform the leader of the new ISR
3.2.4    send a StopReplica command to the brokers in the current AR (but not in RAR).
3.2.5    update /brokers/topics/[topic] to change AR to RAR
3.2.6    delete /brokers/partitions_reassigned/[topic-part]
3.2.7    release controller lock
G. Follower fetching from leader and leader advances HW

A follower keeps sending ReplicaFetcherRequests to the leader. A leader advances its HW when every replica in ISR has received messages up to that point. The process at the leader and the follower are described below.

Code Block

ReplicaFetchReqeust {
   topic: String
   partition_id: Int
   replica_id: Int
   offset: Long
}

ReplicaFetchResponse {
   hw: Long             // the offset of the last message committed at the leader
   messages: MessageSet	// fetched messages
}

At the leader, on receiving a ReplicaFetchRequest, it calls on_ReplicaFetchRequest
on_ReplicaFetchRequest(f: ReplicaFetchRequest) {
  leader = getLeaderReplica(f.topic, f.partition_id)
  if(leader == null) throw NotLeaderException
  response = new ReplicaFetcherResponse
  getReplica(f.topic, f.partition_id, f.replica_id).leo = f.offset

  call maybe_change_ISR()                            // see if we need to shrink or expand ISR
  leader.hw = min of leo of every replica in ISR     // try to advance HW
  // trigger potential acks to produce requests

  response.messages = fetch messages starting from f.offset from leader.log
  response.hw = leader.hw
  send response back
}

maybe_change_ISR() {
  // If a follower is slow or is not active at all, the leader will want to take it out of ISR so that it can commit messages
  // with fewer replicas.
  find the smallest leo (leo_min) of every replica in ISR
  if ( leader.leo - leo_min > MAX_BYTES_LAG || the replica with leo_min hasn't updated leo for more than MAX_TIME_LAG)
    newISR = ISR - replica with leo_min


  // If a follower has fully caught up, the leader will want to add it to ISR.
  for each replica r not in ISR
    if ( r.leo - leo_min < MIN_BYTES_LAG)
      newISR = ISR + r


  update the LeaderAndISR path in ZK with newISR  leader.ISR = new ISR
}

At the follower: ReplicaFetcherThread for Replica r
run() {
  while(true) {
    send ReplicaFetchRequest to leader and get response:ReplicaFetcherResponse back
    append response.messages to r's log
    r.hw = response.hw
    advance offset in ReplicaFetchRequest
  }
}
Discussions:
1. End-to-end latency during a broker failure:

...