Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Get rid of AlterQuorum in favor of AlterPartitionReassignments (plus some cleanups)

...

Motivation

This proposal follows KIP-500, which lays out an architecture for Kafka based on a replicated metadata log which is maintained by a self-managed quorum. Log replication is at the heart of consensus protocols such as Raft. It Kafka, and it is also at the heart of Kafkaconsensus protocols such as Raft. The replication protocol that we spent years improving and validating is actually not too different from Raft if you take a step back. There is a single leader at any time which is enforced by a monotonically increasing epoch; messages are unique by offset and epoch; and there is a protocol to reconcile log inconsistencies following leader changes using offset and epoch information from the log. The gaps between Kafka's replication protocol and Raft are the following:

...

Essentially we have relied on Zookeeper's consensus protocol up now to enforce consistent replication semantics. With KIP-500, we are taking a step into the wild. This proposal is about bridging the gaps in the Kafka replication protocol in order to support the controller's metadata quorum. 

The protocol we are proposing is a sort of Raft dialect which is heavily influenced by Kafka's log replication protocol. For example, it is pull-based unlike Raft which is push-based. This adds some complication, but mostly it's just looking at Raft through a mirror. We also have favored Kafka terminology (offset/epoch) over traditional Raft terminology (index/term). Think of the protocol as beginning with Kafka log replication and adding Raft leader election. 

...

This work does not define the messages that will be used by the Controller to maintain the state of the cluster. That will come in a follow-up KIP.

At a high level, this proposal treats the metadata log as a topic with a single partition. The benefit of this is that many of the existing APIs can be reused. For example, it allows users to use a consumer to read the contents of the metadata log for debugging purposes. We are also trying to pave the way for normal partition replication through Raft as well as eventually supporting metadata sharding with multiple partitions.  In our prototype implementation, we have assumed the name __cluster_metadata for this topic, but this is not a formal part of this proposal.

Key Concepts

If you are familiar with Kafka replication, most of the same concepts apply to Raft replication; however we replace ISR commit semantics with quorum commit semantics and we add leader election.

Leader Epoch: Known as a "term" in Raft, this is a monotonically increasing sequence which is incremented during every leader election. This is used both to fence zombies and to reconcile logs which have diverged. Log records are uniquely identified by the offset in the log and the epoch of the leader that did the appendappended them.

High watermark: Although there is no notion of an ISR for a quorum-based replication protocol, we still have a high watermark to control when a record is considered "committed." This is the largest offset which is replicated to a majority of the voters. The protocol is designed to guarantee that committed records are not lost.

...

Leader: After a candidate gathers a majority of votes from its peers (including itself), it will become the leader for the current epoch. For each epoch there will be only one leader which takes client requests for new records. The single leader and all other voters form the quorum.

Follower: A voter which has either cast its vote for a current candidate or which is fetching from the current leader is known as a follower. Followers can become candidates after a configurable timeout if they have not heard from the current leader or if an active election does not conclude.

Observer: An observer is a replica which is not eligible to vote and cannot become a leader. But like other voters it can still fetch from the leader to replicate records. In other words an observer is only responsible for discovering the leader and replicating the log.

...

This proposal requires a persistent log as well as a separate file to maintain the current quorum state. The latter is needed both to support dynamic reassignment of the voters and for the persistence of vote state. Below we define the structure of this state.

...

  • Vote: Sent by a voter to initiate an election.
  • BeginQuorumEpoch: Used by a new leader to inform the voters of its status.
  • EndQuorumEpoch: Used by a leader to gracefully step down and allow a new election.
  • FetchQuorumRecordsFetch: Sent by voters and observers to the leader in order to replicate the log.
  • DiscoverBrokers: Used to discover or view current quorum state when bootstrapping a broker.

There are also two administrative APIs:

  • AlterQuorumAlterPartitionReassignmentsAdministrative API to change the members of the quorum.
  • DescribeQuorum: Administrative API to list the replication state (i.e. lag) of the voters.

...

  1. The core requests have a field for the clusterId. This is validated when a request is received to ensure that misconfigured brokers will not cause any harm to the cluster.
  2. All requests save Vote have a field for the leader epoch. Voters and leaders are always expected to ensure that the request epoch is consistent with its own and return an error if it is not.
  3. We piggyback current leader and epoch information on all responses. This reduces latency to discover a leader changechanges.
  4. Although this KIP only plans for expects a single-raft quorum implementation initially, we see it is beneficial to keep the door open for multi-raft architecture in the long term, . This leaves the door open for use cases such as sharded controller or general quorum based topic replication. So all the core RPCs will be designed as "batch" API accommodate this need from the beginning, but only be used in a single Raft quorum of the metadata topic. 

As mentioned above, Note that this protocol is only concerned with leader election and log replication. It does not specify what log entries are appended to the leader's log nor how they will be received by the leader. Typically this would be through specific management APIs. For example, KIP-497 adds an AlterISR API. When the leader of the metadata quorum (i.e. the controller) receives an AlterISR request, it will append an entry to its log.

Below we describe the schema and behavior of each of these APIs. 

Error Codes

Additionally, we have taken log compaction out of the scope because this is tied to the semantics of the metadata log. See KIP-630 for more details on compaction.

Common Error Codes: these APIs rely on a common set of error codes which are defined here.Below we identify the error codes that will be used in this protocol:

  • INVALID_CLUSTER_ID: The request either included a clusterId which does not match the one expected by the leader or failed to include a clusterId when one was expected.
  • FENCED_LEADER_EPOCH: The leader epoch in the request is smaller than the latest known to the recipient of the request.
  • UNKNOWN_LEADER_EPOCH: The leader epoch in the request is larger than expected. Note that this is an unexpected error. Unlike normal Kafka log replication, it cannot happen that the follower receives the newer epoch before the leader.
  • OFFSET_OUT_OF_RANGE: Used in the FetchQuorumRecords API to indicate that the follower has fetched from an invalid offset and should truncate to the offset/epoch indicated in the response.
  • NOT_LEADER_QUORUMFOR_LEADERPARTITION: Used in DescribeQuorum and AlterQuorum AlterPartitionReassignments to indicate that the recipient of the request is not the current leader.
  • INVALID_QUORUM_STATE: This error code is reserved for cases when a request conflicts with the local known state. For example, if two separate nodes try to become leader in the same epoch, then it indicates an illegal state change.
  • INCONSISTENT_VOTER_SET: Used when the request contains inconsistent membership.

Below we describe the schema and behavior of each of these APIs. 

Vote

The Vote API is used by voters to hold an election. As mentioned above, the main difference from Raft is that this protocol is pull-based. Voters send fetch requests to the leaders in order to replicate from the log. These fetches also serve as a liveness check for the leader. If a voter perceives a leader as down, it will hold a new election and declare itself a candidate. A voter will begin a new election under three conditions:

...

A voter triggers an election by first voting for itself and updating the quorum-state file. Once this state is persistent, the voter becomes a candidate and sends a VoteRequest to all the other voters. Note that a candidate cannot change its voters, including candidates, are not allowed to change their vote once cast for itselfin a given epoch.

Note that new New elections are always delayed by a random time which is bounded by quorum.election.backoff.max.ms. This is part of the Raft protocol and is meant to prevent gridlocked elections. For example, with a quorum size of three, if only two voters are online, then we have to prevent repeated elections in which each voter declares itself a candidate and refuses to vote for the other.

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "VoteRequest",
  "validVersions": "0",
  "flexibleVersion": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      { "name": "Topics", "type": "[]VoteTopicRequest", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]VotePartitionRequest", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {"name": "CandidateEpoch", "type": "int32", "versions": "0+",
              "about": "The bumped epoch of the candidate sending the request"},
              {"name": "CandidateId", "type": "int32", "versions": "0+",
              "about": "The ID of the voter sending the request"},
              {"name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
              "about": "The epoch of the last record written to the metadata log"},
              {"name": "LastOffset", "type": "int64", "versions": "0+",
              "about": "The offset of the last record written to the metadata log"}
            ]
          }
      }
  ]
}

As noted above, the request follows the convention of other batched partition APIs, such as Fetch and Produce. Initially we expect that only a single topic partition will be included, but this leaves the door open for "multi-raft" support.

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "VoteResponse",
  "validVersions": "0",
  "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+"},
      { "name": "Topics", "type": "[]VoteTopicResponse", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]VotePartitionResponse", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {"name": "ErrorCode", "type": "int16", "versions": "0+"},
              {"name": "LeaderId", "type": "int32", "versions": "0+",
              "about": "The ID of the current leader or -1 if the leader is unknown."},
              {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
              "about": "The latest known leader epoch"},
              {"name": "VoteGranted", "type": "bool", "versions": "0+"
              "about": "True if the vote was granted and false otherwise"}
            ]
          }
      }        
  ]
}

...

  1. First it checks whether an epoch larger than the candidate epoch from the request is known. If so, the vote is rejected.
  2. It Next it checks if it has voted for that candidate epoch already. If it has, then only grant the vote if the candidate id matches the id that was already voted. Otherwise, the vote is rejected.
  3. If the candidate epoch is larger than the currently known epoch:
    1. Check whether CandidateId is one of the expected voters. If not, then reject the vote. The candidate in this case may have been part of an incomplete AlterQuorum changevoter reassignment, so the voter should accept the epoch bump and itself begin a new election.
    2. Check that the candidate's log is at least as up-to-date as it (see above for the comparison rules). If yes, then grant that vote by first updating the quorum-state file, and then returning the response with voteGranted to yes; otherwise rejects that request with the response.

...

  1. First we need to check if the voter is still a candidate because it may already observed an election with a larger epoch; if it is no longer in "candidate" state, just ignore the response.
  2. Otherwise, check if it has accumulated majority of votes for this epoch – this information does not need to be persisted, since upon failover it can just resend the vote request and the voters would just grant that request again as long as there's no newer epoch candidate – and if yes, it can transit to the leader state by updating the quorum-state file indicating itself as the leader for the new epoch. Otherwise, just record this vote in memory and do nothing..
  3. If a candidate has received a majority of votesUpon becoming the leader, it would also start by writing the current assignment state to its log so that its LastEpoch and LastEpochOffset are updated, and then after that it can start sending out BeginQuorumEpoch requests. 

Writing the dummy entry with the new epoch is not necessary for correctness, but as an optimization to reduce the client request latency. Basically it allows for faster advancement of the high watermark following a leader election (the need for this is discussed in more detail below in the handling of FetchQuorumRecordsthe Fetch API). The dummy record will be a control record with Type=3. Below we define the schema: 

...

Note on Gridlocked Elections:  It is possible that an election fails because each voter votes for itself. Or, with an even number of voters active, it is possible that a vote ends up splitfor elections to fail. For example, if each voter becomes a candidate at the same time and votes for itself. Generally if a voter fails to get a majority of votes before quorum.election.timeout.ms, then the vote is deemed to have failed, which will cause the candidate to bump the epoch, step down , and backoff according to quorum.election.backoff.max.ms before retrying. Under some situations, a candidate can immediately detect when a vote has failed. For example, if there are only two voters and a candidate fails to get a vote from the other voter (i.e. VoteGranted is returned as false in the VoteResponse), then there is no need to wait for the election timeout. The candidate in this case can immediately step down and backoff.

...

The EndQuorumEpoch API is used by a leader to gracefully step down so that an election can be held immediately without waiting for the election timeout. The primary use case for this is to enable graceful shutdown. If the shutting down voter is either an active current leader or a candidate if there is an election in progress, then this request will be sent. It is also used when the leader needs to be removed from the quorum following an AlterQuorum AlterPartitionReassignments request. 

The EndQuorumEpochRequest will be sent to all voters in the quorum. Inside each request, leader will define the list of preferred successors sorted by each voter's current replicated offset in descending order. Based on the priority of the preferred successors, each voter will choose the corresponding delayed election time so that the most up-to-date voter has a higher chance to be elected. If the node's priority is highest, it will become candidate immediately instead of waiting for next poll. For a successor with priority N > 0, the next election timeout will be computed as:

...

Rather than following the Kafka approach, this protocol piggybacks log reconciliation on the FetchQuorumRecords Fetch API, which is more akin to Raft replication. When voters or leaders send a FetchQuorumRecords Fetch request, in addition to including an offset to fetch from, they also indicate the epoch of the last offset that they have in their local log (we refer to this as the "fetch epoch"). The leader always checks whether the fetch offset and fetch epoch are consistent with its own log. If they do not match, the fetch response will indicate the largest epoch and its end offset before the requested epoch. It is the responsibility of followers to detect leader changes and use FetchQuorumRecords Fetch responses to truncate the log.

The advantage of this approach is that it simplifies the follower state machine. Basically it just needs to sends fetches to the current expected leader with the fetch offset. The response may indicate either a new leader to fetch from or a new fetch offset. It is also safer because the leader is able to validate the fetch offset on every request rather than relying on the follower to detect the epoch changes.

Since we are using the existing Fetch API, we believe that it would make sense to change the normal replication protocol in a similar way. We will leave this as a follow-up KIP opportunity.

Extra condition on commitment: The Raft protocol requires the leader to only commit entries from any previous epoch if the same leader has already successfully replicated an entry from the current epoch. Kafka's ISR replication protocol suffers from a similar problem and handles it by not advancing the high watermark until the leader is able to write a message with its own epoch. The diagram below taken from the Raft dissertation illustrates the scenario:

...

The problem concerns the conditions for commitment and leader election. In this diagram, S1 is the initial leader and writes "2," but fails to commit it to all replicas. The leadership moves to S5 which writes "3," but also fails to commit it. Leadership then returns to S1 which proceeds to attempt to commit "2." Although "2" is successfully written to a majority of the nodes, the risk is that S5 could still become leader, which would lead to the truncation of "2" even though it is present on a majority of nodes.

In Kafka's replication protocolthe protocol we are proposing here, we address this issue by not advancing the high watermark after an election until an entry has been written by the new leader with its epoch. So we would allow the election to S5, which had written "3" even though a majority of nodes had written "2." This is is allowable because "2" was not considered committed. Therefore, as long as we do not respond to client's request for the entries it added until the high watermark is advanced beyond it then this extra condition is satisfied.

This proposal follow's Kafka's replication protocol. The high watermark for the quorum is not advanced until a record from the current epoch has been written. This is the purpose of the "dummy" leader change message that was mentioned in the section on Vote handling. Without this new message, it could be some time before a new entry is written, which could prevent recent appends from being committed.

As stated, we shall just reuse existing Fetch API, with an additional FetchEpoch field to allow leader to do the divergence check as necessary. In long term, we would consider deprecating the OffsetForLeaderEpoch API to consolidate the offset validation into Fetch protocol. We would also add two new optional structs to the FetchResponse: NextFetchOffsetAndEpoch and CurrentLeader. The first is provided if the FetchRequest does not specify a valid offset and epoch. A follower handles this by attempt to truncate to the next offset specified in the response. The second is the current leader, which will always be provided for fetches from the metadata quorum, but is ignored in normal partition replication.

Request Schema

There is a similar problem in the normal partition replication protocol. When a leader is elected, it cannot know the previous high watermark until the replicas in the ISR have fetched up to the offset at the start of the new epoch. In this case, if a follower is no longer active, the leader can always shrink the ISR to ensure that the high watermark does not remain stuck. In the Raft protocol, we do not have an ISR which leaders are chosen from; we rely on the highest epoch/offset in the log to elect leaders. In order to advance the high watermark after becoming leader, we need to know that there is no other replica that could become leader unless it has replicated all of the data that we commit. This is guaranteed as soon as the leader is able to commit an entry that it itself has written since this is guaranteed to have a larger epoch than any previous entry. However, there is a risk that the client may not send any record immediately and that the high watermark could remain stuck at an offset which is lower than the log end offset. This is the purpose of the "dummy" leader change message that was mentioned in the section on Vote handling. After becoming leader, we immediately append this control record, which means that we the high watermark can catch up to the end offset reliably.

Current Leader Discovery: This proposal also extends the Fetch response to include a separate field for the receiver of the request to indicate the current leader. This is intended to improve the latency to find the current leader since otherwise a replica would need additional round trips. We believe that using the same mechanism would be helpful for normal partitions as well since consumers have to use the Metadata API to find the current leader after received a NOT_LEADER_FOR_PARTITION error. However, this is outside the scope of this proposal.

Request Schema

Code Block
{
  "apiKey": 1,
  "type": "request",
  "name": "FetchRequest",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+",
      "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
    { "name": "MaxWaitTimeMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false,
      "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records" },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID." },
    { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": false,
      "about": "The fetch session epoch, which is used for ordering requests in a session" },
    { "name": "Topics", "type": "[]FetchableTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "FetchPartitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
          "about": "The current leader epoch of the partition." },
        { "name": "FetchOffset", "type": "int64", "versions": "0+",
          "about": "The message offset." },
		// ---------- Start new field ----------
        { "name": "FetchEpoch", "type": "int32", "versions": "12+",  
          "about": "The epoch of the last replicated record"},
		// ---------- End new field ----------
        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": false,
          "about": "The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower."},
        { "name": "MaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." }
      ]}
    ]},
    { "name": "Forgotten", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false,
      "about": "In an incremental fetch request, the partitions to remove.", "fields": [
      { "name": "Name", "type": "string", "versions": "7+", "entityType": "topicName",
        "about": "The partition name." },
      { "name": "ForgottenPartitionIndexes", "type": "[]int32", "versions": "7+",
        "about": "The partitions indexes to forget." }
    ]},
    { "name": "RackId", "type":  "string", "versions": "11+", "default": "", "ignorable": true,
      "about": "Rack ID of the consumer making this request"}
  ]
}

...

When a leader receives a FetchRequest, it must check the following:

  1. Verify First ensure that the leader epoch is the same. If not, reject this request with either the FENCED_LEADER_EPOCH or UNKNOWN_LEADER_EPOCH error.
    1. If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
    2. If the leader epoch is larger, then eventually itself the receiver would learn about the new epoch anyways. Actually this case should not happen since, unlike the normal partition replication protocol, leaders are always the first to discover that they have been elected.
  2. Check that the epoch on the FetchOffset's  FetchEpoch are consistent with the leader's log. Specifically we check that FetchOffset is less than or equal to the end offset of FetchEpoch. If not, return OUT_OF_RANGE and encode the next FetchOffset as the last offset of the largest epoch which is less than or equal to the fetcher's epoch. This is a heuristic of truncating to let the voter truncate as much as possible to get to the starting-divergence point with fewer FetchQuorumRecords round-trips: if the fetcher's epoch is X which does not match the epoch of that fetching offset, then it means all records of epoch X on that voter may have diverged and hence could be truncated, then returning the next offset of largest epoch Y (< X) is reasonable.
  3. If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.

...

  1. Start with a single node configured with broker.id=0 and quorum.voters=0.
  2. Start the node and verify quorum status. This would also be a good opportunity to make dynamic config changes or initialize security configurations.
  3. Add additional nodes to the cluster and verify that they can connect to the quorum.
  4. Finally use AlterQuorum AlterPartitionReassignments to grow the quorum to the intended size.

...

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with DiscoverBrokers and Metadata.
  3. Otherwise the response can be returned to the application, or the request eventually times out.

...

AlterPartitionReassignments

The AlterQuorum API AlterPartitionReassignments API was introduced in KIP-455 and is used by the admin client to reassign change the voters of the quorum or cancel an ongoing reassignment. It requires ALTER on CLUSTER permissionreplicas of a topic partition. We propose to reuse it for Raft voter reassignment as well.

The effect of AlterQuorum of an AlterPartitionReassignments is to change the TargetVoters field in the VoterAssignmentMessage defined above. Once this is done, the leader will begin the process of bringing the new nodes into the quorum and kicking out the nodes which are no longer needed.

Cancellation: If TargetVoters Replicas is set to null in the request, then effectively we will cancel an ongoing reassignment and leave the quorum with the current voters. The more preferable option is to always set the intended TargetVoters. Note that it is always possible to send a new AlterQuorum AlterPartitionReassignments request even if the pending reassignment has not finished. So if we are in the middle of a reassignment from (1, 2, 3) to (4, 5, 6), then the user can cancel the reassignment by resubmitting (1, 2, 3) as the TargetVoters.

Request Schema

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "AlterQuorum",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    {"name": "ClusterId", "type": "string", "versions": "0+"},
	{ "name": "Topics", "type": "[]AlterQuorumTopicRequest", 
      "versions": "0+", "fields": [
        { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]AlterQuorumPartitionRequest", 
          "versions": "0+", "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },
			{ "name": "TargetVoters", "type": "[]Voter", "nullableVersions": "0+", "default": "null", 
	   	      "about": "The target quorum, or null if this is a cancellation request",
     	  	  "versions": "0+", "fields": [
                {"name": "VoterId", "type": "int32", "versions": "0+"}
            ]}
          ]
       }
    }     	
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "AlterQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
	{ "name": "Topics", "type": "[]AlterQuorumTopicResponse", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]AlterQuorumPartitionResponse", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {"name": "ErrorCode", "type": "int16", "versions": "0+"},
		     
            ]
          }
      }
  ]
}

AlterQuorum Request Handling

, 6), then the user can cancel the reassignment by resubmitting (1, 2, 3) as the TargetVoters.

AlterPartitionReassignments Request Handling

This request must be sent to the leader (which is the controller in KIP-500). Upon receiving the AlterPartitionReassignmentsUpon receiving the AlterQuorum request, the node will verify a couple of things:

  1. If the target node is not the leader of the quorum, return NOT_LEADER_QUORUMFOR_LEADER PARTITION code to the admin client. 
  2. Check whether TargetVoters in the request matches that from the latest assignment. If so, check whether it has been committed. If it has, then return immediately. Otherwise go to step 4.
  3. If the TargetVoters in the request does not match the current TargetVoters, then append a new VoterAssignmentMessage to the log. 
  4. Return successfully only when the desired TargetVoters has been safely committed to the log.

After returning the result of the request, the leader will begin the process to modify the quorum. This is described in more detail below, but basically the leader will begin tracking the fetch state of the target voters. It will then make a sequence of single-movement alterations by appending new VoterAssignmentMessage records to the log.

...

AlterPartitionReassignments Response Handling

The current response handling is similar to DescribeQuorum. The admin client would do the followingof this API in the AdminClient is sufficient for our purpose:

  1. If the response error indicates that the intended node is not the current leader, then check use the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with DiscoverBrokers Metadata API to find the latest leader and retry.
  2. Otherwise return the successful response to the application.

Note that the AdminClient should retry the AlterQuorum AlterPartitionReassignments request if it times out before the reassignment had been committed to the log. If the caller does not continue retrying the operation, then there is no guarantee about whether or not the reassignment had been successfully received by the cluster.

...

This protocol allows arbitrary quorum changes through the AlterQuorum AlterPartitionReassignments API. Internally, we arrive at the target quorum by making a sequence of single-member changes.

...

Reassignment Algorithm

Upon receiving an AlterQuorum AlterPartitionReassignments request, the leader will do the following:

  1. Append an VoterAssignmentMessage to the log with the current voters as CurrentVoters and the TargetVoters from Replicas from the AlterQuorum AlterPartitionReassignments request as the TargetVoters.
  2. Leader will compute 3 sets based on CurrentVoters and TargetVoters:
    1. RemovingVoters: voters to be removed
    2. RetainedVoters: voters shared between current and target
    3. NewVoters: voters to be added
  3. Based on comparison between size(NewVoters) and size(RemovingVoters),
    1. If size(NewVoters) >= size(RemovingVoters), pick one of NewVoters as NV by writing a record with CurrentVoters=CurrentVoters + NV, and TargetVoters=TargetVoters.
    2. else pick one of RemovingVoters as RV, preferably a non-leader voter, by writing a record with CurrentVoters=CurrentVoters - RV, and TargetVoters=TargetVoters.
  4. Once the record is committed, the membership change is safe to be applied. Note that followers will begin acting with the new voter information as soon as the log entry has been appended. They do not wait for it to be committed.
  5. As there is a potential delay for propagating the removal message to the removing voter, we piggy-back on the `FetchQuorumRecords` to inform the voter to downgrade immediately after the new membership gets committed. See the error code NOT_FOLLOWER.
  6. The leader will continue this process until one of the following scenarios happens:
    1. If TargetVoters = CurrentVoters, then the reassignment is done. The leader will append a new entry with TargetVoters=null to the log.
    2. If the leader is the last remaining node in RemovingVoters, then it will step down by sending EndQuorumEpoch to the current voters. It will continue as a voter until the next leader removes it from the quorum.

...

To be effectively collecting information from one place and have the membership change logic centralized, leader based approach is more favorable. However, tracking the progress of an observer should only happen during reassignment, which is also the reason why we may see incomplete log status from DescribeQuorum API when the cluster is stable.

Reassignment Admin Tools

To facilitate the usability of the reassignment protocol, we would also add two admin requestsIn order to support the DescribeQuorum API, we will add the following APIs to the AdminClient:

Code Block
languagejava
titleKafkaAdminClient.java
public QuorumInfo describeQuorum(Set<TopicPartition> topicPartitions, DescribeQuorumOptions options);

public AlterQuorumInfo alterQuorum(Map<TopicPartition, Set<Integer>> newVoterIdsByPartition, AlterQuorumOptions options);

The recommended usage is to first use the describeQuorum API to get the existing cluster status, learning whether there is any ongoing reassignment. The QuorumInfo struct is deducted from the DescribeQuorumResponse:

Code Block
languagejava
titleKafkaAdminClient.java
public clase QuorumInfo {
	
	Map<Integer, VoterDescription> currentVoters();

	Map<Integer, VoterDescription> targetVoters();

	Map<Integer, VoterDescription> observers();

	Optional<Integer> leaderId();

    int leaderEpoch();

	long highWatermark();
 
	public static class VoterDescription {
		Optional<Long> logEndOffset();
        Optional<Long> lastCaughtUpTimeMs();
	}
}

The epoch and offsets are set as optional because the leader may not actively maintain all the observers' status.

How does the reassignment tool work internally

}
}

The epoch and offsets are set as optional because the leader may not actively maintain all the observers' statusWhen the admin calls the alterQuorum, underlying the thread will first send a DiscoverBrokersRequest to find the stable leader. If that call times out or the group is in the election, the call would fail and inform user to retry. Once the leader is found, AdminClient will send AlterQuorumRequest to it. If the returned error is retriable like NOT_QUORUM_LEADER, the tool will perform a rediscovery of the quorum leader. For fatal errors such as authorization errors, the call would fail and inform user the result.

Tooling Support

We will add a new utility called kafka-metadata-quorum.sh to describe and alter quorum state. As usual, this tool will require --bootstrap-server to be provided.  We will support the following options:

...

Code Block
> bin/kafka-metadata-quorum.sh --describe
LeaderId:				0
LeaderEpoch: 			15
HighWatermark:			234130
MaxFollowerLag: 		34
MaxFollowerLagTimeMs:	15
CurrentVoters:			[0, 1, 2]
TargetVoters:			[0, 3, 4]

> bin/kafka-metadata-quorum.sh --describe replication
ReplicaId	LogEndOffset	Lag		LagTimeMs	Status		IsReassignTarget
0			234134			0		0			Leader		Yes
1			234130			4		10			Follower	No
2			234100			34		15			Follower	No
3			234124			10		12			Observer	Yes
4			234130			4		15			Observer	Yes

Altering Voters

Initially, the only purpose of this API is to perform reassignments. In the future, there may be additional uses. Below is an example usage:

Code Block
> bin/kafka-metadata-quorum.sh --alter --voters 0,3,4
CurrentVoters:			[0, 1, 2]
TargetVoters:			[0, 3, 4]

Once a reassignment has been submitted, users can use one of the --describe options to monitor progressWe intend to use the current kafka-reassign-partition.sh tool in order to make assignment changes to the metadata partition.

Metrics

Here’s a list of proposed metrics for this new protocol:

...

The goal for Raft quorum is to replace Zookeeper dependency and reach higher performance for metadata operations. In the first version, we will be building necessary metrics to monitor the end-to-end latency from admin request (AlterQuorumAlterPartitionReassignments) and client request being accepted to being committed. We shall monitor the time spent on local, primarily the time to fsync the new records and time to apply changes to the state machine, which may not be really a trivial operation. Besides we shall also monitor the time used to propagate change on the remote, I.E. latency to advance the high watermark. Benchmarks will also be built to compare the efficiency for a 3-node broker cluster using Zookeeper vs Raft, under heavy load of metadata changes. We shall also be exploring existing distributed consensus system load frameworks at the same time, but this may beyond the scope of KIP-595. 

...