Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Alter quorum change + DiscoverBrokers

...

  • bootstrap.servers:  Defines the set of servers to contact to get quorum information. This does not have to address quorum members directly. For example, it could be a VIP.
  • quorum.voters: Defines the ids of the expected voters. This is only required when bootstrapping the cluster for the first time. As long as the cluster (hence the quorum) has started then new brokers would rely on FindQuorum DiscoverBrokers (described below) to discover the current voters of the quorum. 
  • quorum.fetch.timeout.ms: Maximum time without a successful fetch from the current leader before a new election is started.
  • quorum.election.timeout.ms: Maximum time without collected a majority of votes during the candidate state before a new election is retried.
  • quorum.election.jitter.max.ms: Maximum random jitter after an election timeout before a new election is triggered.
  • quorum.request.timeout.ms: Maximum time before a pending request is considered failed and the connection is dropped.
  • quorum.retry.backoff.ms: Initial delay between request retries.
  • quorum.retry.backoff.max.ms: Max delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.ms (the same as in KIP-580).
  • broker.id: The existing broker id config shall be used as the voter id in the Raft quorum.

...

  • Vote: Sent by a voter to initiate an election.
  • BeginQuorumEpoch: Used by a new leader to inform the voters of its status.
  • EndQuorumEpoch: Used by a leader to gracefully step down and allow a new election.
  • FetchQuorumRecords: Sent by voters and observers to the leader in order to replicate the log.
  • FindQuorumDiscoverBrokers: Used to discover or view current quorum state when bootstrapping a broker.

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "VoteRequest",
  "validVersions": "0",
  "flexibleVersion": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      { "name": "Topics", "type": "[]VoteTopicRequest", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "20+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]VotePartitionRequest", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {"name": "CandidateEpoch", "type": "int32", "versions": "0+",
              "about": "The bumped epoch of the candidate sending the request"},
              {"name": "CandidateId", "type": "int32", "versions": "0+",
              "about": "The ID of the voter sending the request"},
              {"name": "LastEpoch", "type": "int32", "versions": "0+",
              "about": "The epoch of the last record written to the metadata log"},
              {"name": "LastEpochEndOffset", "type": "int64", "versions": "0+",
              "about": "The offset of the last record written to the metadata log"}
            ]
          }
      }
  ]
}

...

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "VoteResponse",
  "validVersions": "0",
  "fields": [
     {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      { "name": "Topics", "type": "[]VoteTopicResponse", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "20+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]VotePartitionResponse", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {"name": "ErrorCode", "type": "int16", "versions": "0+"},
              {"name": "LeaderId", "type": "int32", "versions": "0+",
              "about": "The ID of the current leader or -1 if the leader is unknown."},
              {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
              "about": "The latest known leader epoch"},
              {"name": "VoteGranted", "type": "bool", "versions": "0+"
              "about": "True if the vote was granted and false otherwise"}
            ]
          }
      }        
  ]
}

...

Note that only voters receive the BeginQuorumEpoch request: observers will discover the new leader through either the FindQuorum DiscoverBrokers or FetchQuorumRecords APIs. For example, the old leader would return an error code in FetchQuorumRecords response indicating that it is no longer the leader and it will also encode the current known leader id / epoch as well, then the observers can start fetching from the new leader. In case the old leader does not know who's the new leader, observers can still fallback to FindQuorum DiscoverBrokers request to discover the new leader.

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "BeginQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      { "name": "Topics", "type": "[]BeginQuorumTopicRequest", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "20+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]BeginQuorumPartitionRequest", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {"name": "LeaderId", "type": "int32", "versions": "0+",
                "about": "The ID of the newly elected leader"},
              {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
                "about": "The epoch of the newly elected leader"}
            ]
          }
      }
  ]
}

...

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "BeginQuorumEpochResponse",
  "validVersions": "0",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      { "name": "Topics", "type": "[]BeginQuorumTopicResponse", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "20+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]BeginQuorumPartitionResponse", 
            "versions": "0+", "fields": [
            {"name": "ErrorCode", "type": "int16", "versions": "0+"},
            {"name": "LeaderId", "type": "int32", "versions": "0+",
             "about": "The ID of the current leader or -1 if the leader is unknown."},
            {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
             "about": "The latest known leader epoch"}
            ]
          }
      }
  ]
}

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "EndQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
	{"name": "ClusterId", "type": "string", "versions": "0+"},
    { "name": "Topics", "type": "[]EndQuorumTopicRequest", 
      "versions": "0+", "fields": [
        { "name": "TopicName", "type": "string", "versions": "20+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]EndQuorumPartitionRequest", 
          "versions": "0+", "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },    
            {"name": "ReplicaId", "type": "int32", "versions": "0+",
            "about": "The ID of the replica sending this request"},
            {"name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The current leader ID or -1 if there is a vote in progress"},
            {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The current epoch"},
			 {"name": "PreferredSuccessors", "type": "[]int32", "versions": "0+",
		      "about": "A sorted list of preferred successors to start the election"}
          ]
       }
    }      
  ]
}

...

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "EndQuorumEpochResponse",
  "validVersions": "0",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      { "name": "Topics", "type": "[]EndQuorumTopicResponse", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "20+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]EndQuorumPartitionResponse", 
            "versions": "0+", "fields": [
            {"name": "ErrorCode", "type": "int16", "versions": "0+"},
            {"name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
            {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
            ]
          }
      }          
  ]
}

...

  1. If the response contains FENCED_LEADER_EPOCH error code, check the leaderId from the response. If it is defined, then update the quorum-state file and become a "follower" (may or may not have voting power) of that leader. Otherwise, try to learn about the new leader via FindQuorum DiscoverBrokers requests; in the mean time it may receive BeginQuorumEpoch request which would also update the new epoch / leader as well.
  2. If the response contains OUT_OF_RANGE error code, truncate its local log to the encoded nextFetchOffset, and then resend the FetchRecord request.

...

To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms" config, such that if the leader did not receive FetchQuorumRecords requests from a majority of the quorum for that amount of time, it would start sending FindQuorum DiscoverBrokers request to random nodes in the cluster. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes an observer; and if it realized that it is still within the current quorum's voter list, it would start fetching from that leader. Note that the node will remain a leader until it finds that it has been supplanted by another voter.

...

On Performance: There are also tradeoffs from a performance perspective between these two models. In order to commit a record, it must be replicated to a majority of nodes. This includes both propagating the record data from the leader to the follower and propagating the successful write of that record from the follower back to the leader. The push model potentially has an edge in latency because it allows for pipelining. The leader can continue sending new records to followers while it is awaiting the committing of previous records. In the proposal here, we do not pipeline because the leader relies on getting the next offset from the FetchQuorumRecords request. However, this is not a fundamental limitation. If the leader keeps track of the last sent offset, then we could instead let the FetchQuorumRecords request be pipelined so that it indicates the last acked offset and allows the leader to choose the next offset to send. Basically rather than letting the leader keep sending append requests to the follower as new data arrives,  the follower would instead keep sending fetch requests as long as the acked offset is changing. Our experience with Kafka replication suggests that this is unlikely to be necessary, so we prefer the simplicity of the current approach, which also allows us to reuse more of the existing log layer in Kafka. However, we will evaluate the performance characteristics and make improvements as necessary. Note that although we are specifying the initial versions of the protocols in this KIP, there will almost certainly be additional revisions before this reaches production.

...

DiscoverBrokers

The FindQuorum DiscoverBrokers API is primarily used by new brokers to discover the leader connection of the current quorum and its votersset of live brokers. When a broker first starts up, for example, it will send FindQuorum DiscoverBrokers requests to the bootstrap.servers. Once it discovers the current quorum membership and its own state, then will it resume participation in elections.The FindQuorum API also provides the  the crucial role of discovering connection information for existing nodes using a gossip approach. Brokers include their own connection information in the request and collect the connection information of the voters in the cluster from the response. Additionally, if one of the voters becomes unreachable at some point, then FindQuorum DiscoverBrokers can be used to find the new connection information (similar to how metadata refreshes are used in the client after a disconnect).

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "FindQuorumRequestDiscoverBrokersRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+"},
      { "name": "BootTimestamp", "type": "int64", "versions": "0+"},
      { "name": "Host", "type": "string", "versions": "0+"},
      { "name": "Port", "type": "int32", "versions": "0+"}
   ]
}

...

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "FindQuorumResponseDiscoverBrokersResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      {"name": "ClusterId", "type": "string", "versions": "0+"},
      {"name": "LeaderIdBrokers", "type": "int32[]Broker", "versions": "0+",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
voters", "fields": [
        {   {"name": "LeaderEpochVoterId", "type": "int32", "versions": "0+"},
        { "aboutname": "The latest known leader epoch"}
BootTimestamp", "type": "int64", "versions": "0+"},
        { "name": "VotersHost", "type": "[]Voterstring", "versions": "0+"},
        { "aboutname": "The current voters" },
      {"namePort", "type": "TargetVotersint32", "typeversions": "[]Voter", "versions": "0+", "nullableVersion": "0+",0+"}
       "about": "The target voters if there is a reassignment in progress, null otherwise" },
  ],
  "commonStructs": [
    { "name": "Voter", "versions": "0+", "fields": [
        { "name": "VoterId", "type": "int32", "versions": "0+"},
        { "name": "BootTimestamp", "type": "int64", "versions": "0+"},
        { "name": "Host", "type": "string", "versions": "0+"},
        { "name": "Port", "type": "int32", "versions": "0+"}
    }
  ]
}

Note that only the host and port information are provided. We assume that the security protocol will be configured with security.inter.broker.protocol. Additionally, the listener that is advertised through this API is the same one specified by inter.broker.listener.name.

FindQuorum Request Handling

When a broker receives a FindQuorum request, it will do the following:

  1. If the ReplicaId from the request is greater than 0 and matches one of the existing voters (or target voters), update local connection information for that node if either that node is not present in the cache or if the BootTimestamp from the request is larger than the cached value.
  2. Respond with the latest known leader and epoch along with the connection information of all known voters.

Note on AdminClient Usage: This API is also used by the admin client in order to describe/alter quorum state. In this case, we will use a sentinel -1  for ReplicaId. The values for the other fields in the request will not be inspected by the broker if the ReplicaId is less than 0, so we can choose reasonable sentinels for them as well (e.g. Host can be left as an empty string).

FindQuorum Response Handling

After receiving a FindQuorum response

  1. First check the leader epoch in the response against the last known epoch. It is possible that the node receives the latest epoch from another source before the FindQuorum returns, so we always have to check. If the leader epoch is less than the last known epoch, we ignore the response.
  2. The node will then update its local cache of connection information with the values from the response. As when handling requests, the node will only update cached connection information corresponding to the largest observed BootTimestamp.
  3. Next check if there is a leader defined in the response. If so, then the node will update quorum-state and become a follower.
  4. If there is no leader and the sender is one of the voters, then the node will become a candidate and attempt to find the other nodes in the quorum (if they are not known) in order to send VoteRequest to. If the sender is not a voter, then it will continue to try and discover the leader by sending FindQuorum to other known voters or to bootstrap.servers.

Note on Connection Discovery with Gossip: The reason this API is specified as a gossip protocol is that the cluster must be able to propagate connection information even when there are insufficient voters available to establish a quorum. There must be some base layer which is able to spread enough initial metadata so that voters can connect to each other and conduct elections. An alternative way to do this would be to require connection information always be defined statically in configuration, however this makes operations cumbersome in modern cloud environments where hosts change frequently. We have opted instead to embrace dynamic discovery.

Quorum Bootstrapping

This section discusses the process for establishing the quorum when the cluster is first initialized and afterwards when nodes are restarted. 

When first initialized, we rely on the quorum.voters configuration to define the expected voters. Brokers will only rely on this when starting up if there is no quorum state written to the log and if the broker fails to discover an existing quorum through the FindQuorum API. We suggest that a --bootstrap flag could be passed through kafka-server-start.sh when the cluster is first started to add some additional safety. In that case, we would only use the config when the flag is specified and otherwise expect to discover the quorum dynamically.

Assuming the broker fails to discover an existing quorum, it will then check its broker.id to see if it is expected to be one of the initial voters. If so, then it will immediately include itself in FindQuorum responses. Until the quorum is established, brokers will send FindQuorum to the nodes in bootstrap.servers and other discovered voters in order to find the other expected members. Once enough brokers are known, the brokers will begin a vote to elect the first leader.

Note that the FindQuorum API is intended to be a gossip API. Quorum members use this to advertise their connection information on bootstrapping and to find new quorum members. All brokers use the latest quorum state in response to FindQuorum requests that they receive. The latest voter state is determined by looking at 1) the leader epoch, and 2) boot timestamps. Brokers only accept monotonically increasing updates to be cached locally. This approach is going to replace the current ZooKeeper ephemeral nodes based approach for broker (de-)registration procedure, where broker registration will de done by a new broker gossiping its host information via FindQuorumRequest. As for broker de-registration, it will be done separately for voters and observers: for voters, we would first change the quorum to degrade the shutting down voters to observers of the quorum; and for observers, they can directly shuts down themselves after they've let the controller to move its hosted partitions to other hosts.

Bootstrapping Procedure

To summarize, this is the procedure that brokers will follow on initialization:

  1. Upon starting up, brokers always try to bootstrap its knowledge of the quorum by first reading the quorum-state file and then scanning forward from AppliedOffset to the end of the log to see if there are any changes to the quorum state. For newly started brokers, the log / file would all be empty so no previous knowledge can be restored.
  2. If after step 1), there's some known quorum state along with a leader / epoch already, the broker would:
    1. Promote itself from observer to voter if it finds out that it's a voter for the epoch.
    2. Start sending FetchQuorumRecords request to the current leader it knows (it may not be the latest epoch's leader actually).
  3. Otherwise, it will try to learn the quorum state by sending FindQuorum to any other brokers inside the cluster via boostrap.servers as the second option of quorum state discovery.
  4. If even step 3) cannot find any quorum information – e.g. when there's no other brokers in the cluster, or there's a network partition preventing this broker to talk to others in the cluster – fallback to the third option of quorum state discover by checking if it is among the brokers listed in quorum.voters.
    1. If so, then it will promote to voter state and add its own connection information to the cached quorum state and return that in the FindQuorum responses it answers to other brokers; otherwise stays in observer state.
    2. In either case, it continues to try to send FindQuorum to all other brokers in the cluster via boostrap.servers.
  5. For any voter, after it has learned a majority number of voters in the expected quorum from FindQuorum responses, it will begin a vote.

When a leader has been elected for the first time in a cluster (i.e. if the leader's log is empty), the first thing it will do is append a VoterAssignmentMessage (described in more detail below) which will contain quorum.voters as the initial CurrentVoters. Once this message has been persisted in the log, then we no longer need `quorum.voters` and users can safely roll the cluster without this config.

ClusterId generation: Note that the first leader is also responsible for providing the ClusterId field which is part of the VoterAssignmentMessage. If the cluster is being migrated from Zookeeper, then we expect to reuse the existing clusterId. If the cluster is starting for the first time, then a new one will be generated -- in practice it is the user's responsibility to guarantee the newly generated clusterId is unique. Once this message has been committed to the log, the leader and all future leaders will strictly validate that this value matches the ClusterId provided in requests when receiving Vote, BeginEpoch, EndEpoch, and FetchQuorumRecords requests. 

The leader will also append a LeaderChangeMessage as described in the VoteResponse handling above. This is not needed for correctness. It is just useful for debugging and to ensure that the high watermark always has an opportunity to advance after an election.

Bootstrapping Example

With this procedure in mind, a convenient way to initialize a cluster might be the following.

  1. Start with a single node configured with broker.id=0 and quorum.voters=0.
  2. Start the node and verify quorum status. This would also be a good opportunity to make dynamic config changes or initialize security configurations.
  3. Add additional nodes to the cluster and verify that they can connect to the quorum.
  4. Finally use AlterQuorum to grow the quorum to the intended size.

Quorum Reassignment

The protocol for changing the active voters is well-described in the Raft literature. The high-level idea is to use a new control record type to write quorum changes to the log. Once a quorum change has been written and committed to the log, then the quorum change can take effect.

This will be a new control record type with Type=2 in the key schema. Below we describe the message value schema used for the quorum change messages written to the log. 

]
      }
  ]
}

Note that only the host and port information are provided. We assume that the security protocol will be configured with security.inter.broker.protocol. Additionally, the listener that is advertised through this API is the same one specified by inter.broker.listener.name.

DiscoverBrokers Request Handling

When a broker receives a DiscoverBrokers request, it will do the following:

  1. If the ReplicaId from the request is greater than 0 and matches one of the existing voters (or target voters), update local connection information for that node if either that node is not present in the cache or if the BootTimestamp from the request is larger than the cached value.
  2. Respond with the latest known leader and epoch along with the connection information of all known voters.

Note on AdminClient Usage: This API is also used by the admin client in order to describe/alter quorum state. In this case, we will use a sentinel -1  for ReplicaId. The values for the other fields in the request will not be inspected by the broker if the ReplicaId is less than 0, so we can choose reasonable sentinels for them as well (e.g. Host can be left as an empty string).

DiscoverBrokers Response Handling

After receiving a DiscoverBrokers response, the node will update its local cache of connection information with the values from the response. As when handling requests, the node will only update cached connection information corresponding to the largest observed BootTimestamp.

Note on Connection Discovery with Gossip: The reason this API is specified as a gossip protocol is that the cluster must be able to propagate connection information even when there are insufficient voters available to establish a quorum. There must be some base layer which is able to spread enough initial metadata so that voters can connect to each other and conduct elections. An alternative way to do this would be to require connection information always be defined statically in configuration, however this makes operations cumbersome in modern cloud environments where hosts change frequently. We have opted instead to embrace dynamic discovery.

Quorum Metadata 

To discover the current membership, the broker will rely on Metadata RPC  resume participation in elections. There is no actual change to the RPC for now, as we just need to find the single partition leader after the connection information discovery.

Quorum Bootstrapping

This section discusses the process for establishing the quorum when the cluster is first initialized and afterwards when nodes are restarted. 

When first initialized, we rely on the quorum.voters configuration to define the expected voters. Brokers will only rely on this when starting up if there is no quorum state written to the log and if the broker fails to discover an existing quorum through the DiscoverBrokers API. We suggest that a --bootstrap flag could be passed through kafka-server-start.sh when the cluster is first started to add some additional safety. In that case, we would only use the config when the flag is specified and otherwise expect to discover the quorum dynamically.

Assuming the broker fails to discover an existing quorum, it will then check its broker.id to see if it is expected to be one of the initial voters. If so, then it will immediately include itself in DiscoverBrokers responses. Until the quorum is established, brokers will send DiscoverBrokers to the nodes in bootstrap.servers and other discovered voters in order to find the other expected members. Once enough brokers are known, the brokers will begin a vote to elect the first leader.

Note that the DiscoverBrokers API is intended to be a gossip API. Quorum members use this to advertise their connection information on bootstrapping and to find new quorum members. All brokers use the latest quorum state in response to DiscoverBrokers requests that they receive. The latest voter state is determined by looking at 1) the leader epoch, and 2) boot timestamps. Brokers only accept monotonically increasing updates to be cached locally. This approach is going to replace the current ZooKeeper ephemeral nodes based approach for broker (de-)registration procedure, where broker registration will de done by a new broker gossiping its host information via DiscoverBrokersRequest. As for broker de-registration, it will be done separately for voters and observers: for voters, we would first change the quorum to degrade the shutting down voters to observers of the quorum; and for observers, they can directly shuts down themselves after they've let the controller to move its hosted partitions to other hosts.

Bootstrapping Procedure

To summarize, this is the procedure that brokers will follow on initialization:

  1. Upon starting up, brokers always try to bootstrap its knowledge of the quorum by first reading the quorum-state file and then scanning forward from AppliedOffset to the end of the log to see if there are any changes to the quorum state. For newly started brokers, the log / file would all be empty so no previous knowledge can be restored.
  2. If after step 1), there's some known quorum state along with a leader / epoch already, the broker would:
    1. Promote itself from observer to voter if it finds out that it's a voter for the epoch.
    2. Start sending FetchQuorumRecords request to the current leader it knows (it may not be the latest epoch's leader actually).
  3. Otherwise, it will try to learn the quorum state by sending DiscoverBrokers to any other brokers inside the cluster via boostrap.servers as the second option of quorum state discovery.
  4. Send out MetadataRequest to the discovered brokers to find the current metadata partition leader.
  5. If even step 3) cannot find any quorum information – e.g. when there's no other brokers in the cluster, or there's a network partition preventing this broker to talk to others in the cluster – fallback to the third option of quorum state discover by checking if it is among the brokers listed in quorum.voters.
    1. If so, then it will promote to voter state and add its own connection information to the cached quorum state and return that in the DiscoverBrokers responses it answers to other brokers; otherwise stays in observer state.
    2. In either case, it continues to try to send DiscoverBrokers to all other brokers in the cluster via boostrap.servers.
  6. For any voter, after it has learned a majority number of voters in the expected quorum from DiscoverBrokers responses, it will begin a vote.

When a leader has been elected for the first time in a cluster (i.e. if the leader's log is empty), the first thing it will do is append a VoterAssignmentMessage (described in more detail below) which will contain quorum.voters as the initial CurrentVoters. Once this message has been persisted in the log, then we no longer need `quorum.voters` and users can safely roll the cluster without this config.

ClusterId generation: Note that the first leader is also responsible for providing the ClusterId field which is part of the VoterAssignmentMessage. If the cluster is being migrated from Zookeeper, then we expect to reuse the existing clusterId. If the cluster is starting for the first time, then a new one will be generated -- in practice it is the user's responsibility to guarantee the newly generated clusterId is unique. Once this message has been committed to the log, the leader and all future leaders will strictly validate that this value matches the ClusterId provided in requests when receiving Vote, BeginEpoch, EndEpoch, and FetchQuorumRecords requests. 

The leader will also append a LeaderChangeMessage as described in the VoteResponse handling above. This is not needed for correctness. It is just useful for debugging and to ensure that the high watermark always has an opportunity to advance after an election.

Bootstrapping Example

With this procedure in mind, a convenient way to initialize a cluster might be the following.

  1. Start with a single node configured with broker.id=0 and quorum.voters=0.
  2. Start the node and verify quorum status. This would also be a good opportunity to make dynamic config changes or initialize security configurations.
  3. Add additional nodes to the cluster and verify that they can connect to the quorum.
  4. Finally use AlterQuorum to grow the quorum to the intended size.

Quorum Reassignment

The protocol for changing the active voters is well-described in the Raft literature. The high-level idea is to use a new control record type to write quorum changes to the log. Once a quorum change has been written and committed to the log, then the quorum change can take effect.

This will be a new control record type with Type=2 in the key schema. Below we describe the message value schema used for the quorum change messages written to the log. 

Code Block
{
  "type": "message",
  "name": "VoterAssignmentMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+"}
      {"name": "CurrentVoters", "type": "[]Voter", "versions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]},
      {"name": "TargetVoters", "type": "[]Voter", "versions": "0+", "nullableVersions": "0+", "fields": [
        {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]}
  ]
}

DescribeQuorum

The DescribeQuorum API is used by the admin client to show the status of the quorum. This includes showing the progress of a quorum reassignment and viewing the lag of followers and observers.

Unlike the DiscoverBrokers request, this API must be sent to the leader, which is the only node that would have lag information for all of the voters.

Request Schema

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "DescribeQuorumRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]DescribeQuorumTopicRequest", 
      "versions": "0+", "fields": [
        { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]DescribeQuorumPartitionRequest", 
          "versions": "0+", "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." }
          ]
       }
    }      
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "DescribeQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
     { "name": "Topics", "type": "[]DescribeQuorumTopicResponse", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]DescribeQuorumPartitionResponse", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {"name": "ErrorCode", "type": "int16", "versions": "0+"},
		      {"name": "LeaderId", "type": "int32", "versions": "0+",
		      "about": "The ID of the current leader or -1 if the leader is unknown."},
		      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
		      "about": "The latest known leader epoch"},
		
Code Block
{
  "type": "message",
  "name": "VoterAssignmentMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ClusterIdHighWatermark", "type": "stringint64", "versions": "0+"},
			      {"name": "CurrentVoters", "type": "[]VoterReplicaState", "versions": "0+", "fields": [
 },
		        {"name": "VoterIdTargetVoters", "type": "int32[]ReplicaState", "versions": "0+"}
      ]},
    		  {"name": "TargetVotersObservers", "type": "[]VoterReplicaState", "versions": "0+", "nullableVersions": "0+", "fields": [
 }
            ]
     {"name": "VoterId", "type": "int32", "versions": "0+"}
      ]}
     ]
}

DescribeQuorum

The DescribeQuorum API is used by the admin client to show the status of the quorum. This includes showing the progress of a quorum reassignment and viewing the lag of followers.

Unlike the FindQuorum request, this API must be sent to the leader, which is the only node that would have lag information for all of the voters.

Request Schema

Code Block
{ 
  ],
  "apiKeycommonStructs": N,
  "type[
	{"name": "ReplicaState", "versions": "request0+", "fields": [
      { "name": "DescribeQuorumRequestReplicaId",
  "type": "int32", "validVersionsversions": "0+"},
  "flexibleVersions    { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "fieldsabout": []
}

Response Schema

Code Block
{
  "apiKey": N,
   "The last known log end offset of the follower or -1 if it is unknown"},
      { "name": "LastCaughtUpTimeMs", "type": "responseint64",
  "nameversions": "DescribeQuorumResponse0+",
  "validVersions      "about": "0",The last time the replica was caught up to the high watermark"}
    ]}
  ]
}

DescribeQuorum Request Handling

This request is always sent to the leader node. We expect AdminClient to use DiscoverBrokers and Metadata in order to discover the current leader. Upon receiving the request, a node will do the following:

  1. First check whether the node is the leader. If not, then return an error to let the client retry with DiscoverBrokers. If the current leader is known to the receiving node, then include the LeaderId and LeaderEpoch in the response.
  2. Build the response using current assignment information and cached state about replication progress.

DescribeQuorum Response Handling

On handling the response, the admin client would do the following:

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with DiscoverBrokers and Metadata.
  3. Otherwise the response can be returned to the application, or the request eventually times out.

AlterQuorum

The AlterQuorum API is used by the admin client to reassign the voters of the quorum or cancel an ongoing reassignment. It requires ALTER on CLUSTER permission.

The effect of AlterQuorum is to change the TargetVoters field in the VoterAssignmentMessage defined above. Once this is done, the leader will begin the process of bringing the new nodes into the quorum and kicking out the nodes which are no longer needed.

Cancellation: If TargetVoters is set to null in the request, then effectively we will cancel an ongoing reassignment and leave the quorum with the current voters. The more preferable option is to always set the intended TargetVoters. Note that it is always possible to send a new AlterQuorum request even if the pending reassignment has not finished. So if we are in the middle of a reassignment from (1, 2, 3) to (4, 5, 6), then the user can cancel the reassignment by resubmitting (1, 2, 3) as the TargetVoters.

Request Schema

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "AlterQuorum",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [

  "flexibleVersions": "0+",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
      {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the current leader or -1 if the leader is unknown."},
      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The latest known leader epoch"},
      {"name": "HighWatermark", "type": "int64", "versions": "0+"},
	  {"name": "CurrentVoters", "type": "[]VoterState", "versions": "0+" },
      {"name": "TargetVotersClusterId", "type": "[]VoterStatestring", "versions": "0+" }
  ],
  "commonStructs": [
	{ "name": "VoterStateTopics", "versionstype": "0+[]AlterQuorumTopicRequest", "fields": [
  	    { "nameversions": "VoterId0+", "typefields": "int32", "versions": "0+"},[
        { "name": "LogEndOffsetTopicName", "type": "int64string", "versions": "0+", "entityType": "topicName",
          "about": "The last known log end offset of the follower or -1 if it is unknown"topic name." },
        { "name": "LastCaughtUpTimeMs", { "typename": "int64Partitions", "versionstype": "0+[]AlterQuorumPartitionRequest", 
          "aboutversions": "The last time the follower was caught up to the high watermark"},
0+", "fields": [
        ]}
  ]
}

DescribeQuorum Request Handling

This request is always sent to the leader node. We expect AdminClient to use FindQuorum in order to discover the current leader. Upon receiving the request, a node will do the following:

  1. First check whether the node is the leader. If not, then return an error to let the client retry with FindQuorum. If the current leader is known to the receiving node, then include the LeaderId and LeaderEpoch in the response.
  2. Build the response using current assignment information and cached state about replication progress.

DescribeQuorum Response Handling

On handling the response, the admin client would do the following:

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with FindQuorum.
  3. Otherwise the response can be returned to the application, or the request eventually times out.

AlterQuorum

The AlterQuorum API is used by the admin client to reassign the voters of the quorum or cancel an ongoing reassignment. It requires ALTER on CLUSTER permission.

The effect of AlterQuorum is to change the TargetVoters field in the VoterAssignmentMessage defined above. Once this is done, the leader will begin the process of bringing the new nodes into the quorum and kicking out the nodes which are no longer needed.

Cancellation: If TargetVoters is set to null in the request, then effectively we will cancel an ongoing reassignment and leave the quorum with the current voters. The more preferable option is to always set the intended TargetVoters. Note that it is always possible to send a new AlterQuorum request even if the pending reassignment has not finished. So if we are in the middle of a reassignment from (1, 2, 3) to (4, 5, 6), then the user can cancel the reassignment by resubmitting (1, 2, 3) as the TargetVoters.

...

    { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },
			{ "name": "TargetVoters", "type": "[]Voter", "nullableVersions": "0+", "default": "null", 
	   	      "about": "The target quorum, or null if this is a cancellation request",
     	  	  "versions": "0+", "fields": [
                {"name": "VoterId", "type": "int32", "versions": "0+"}
            ]}
          ]
       }
    }     	
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "requestresponse",
  "name": "AlterQuorumAlterQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {	{ "name": "ClusterIdTopics", "type": "string[]AlterQuorumTopicResponse", 
        "versions": "0+"}, "fields": [
          { "name": "TargetVotersTopicName", "type": "[]Voterstring", "nullableVersionsversions": "0+", "defaultentityType": "nulltopicName",
           
	   "about "about": "The topic name." },
          { "name": "Partitions", "type": "The target quorum, or null if this is a cancellation request",
	[]AlterQuorumPartitionResponse", 
            "versions": "0+", "fields": [
              { "name": "VoterIdPartitionIndex", "type": "int32", "versions": "0+"},
      ]}
   ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "typeabout": "response",
The partition index."name": "AlterQuorumResponse" },
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ErrorCode", "type": "int16", "versions": "0+"},
		     
            ]
          }
      }
  ]
}

AlterQuorum Request Handling

...

The response handling is similar to DescribeQuorum.   The admin client would do the following:

  1. If the response indicates that the intended node is not the current leader, then check the response to see if the LeaderId has been set. If so, then attempt to retry the request with the new leader.
  2. If the current leader is not defined in the response (which could be the case if there is an election in progress), then backoff and retry with FindQuorumDiscoverBrokers.
  3. Otherwise return the successful response to the application.

...

Code Block
languagejava
titleKafkaAdminClient.java
public QuorumInfo describeQuorum(Set<TopicPartition> topicPartitions, DescribeQuorumOptions options);

public AlterQuorumInfo alterQuorum(Set<Integer> newVoterIdsMap<TopicPartition, Set<Integer>> newVoterIdsByPartition, AlterQuorumOptions options);

...

When the admin calls the alterQuorum, underlying the thread will first send a FindQuorumRequest DiscoverBrokersRequest to find the stable leader. If that call times out or the group is in the election, the call would fail and inform user to retry. Once the leader is found, AdminClient will send AlterQuorumRequest to it. If the returned error is retriable like NOT_QUORUM_LEADER, the tool will perform a rediscovery of the quorum leader. For fatal errors such as authorization errors, the call would fail and inform user the result.

...

We will add a new utility called kafka-metadata-quorum.sh to describe and alter quorum state. As usual, this tool will require --bootstrap-server to be provided.  We will support the following options:

...

  • --describe status: a short summary of the quorum status and the other provides detailed information about the status of replication.
  • --describedescribe replication: provides detailed information about the status of replication

Here are a couple examples:

Code Block
> bin/kafka-metadata-quorum.sh --describe
LeaderId:				0
LeaderEpoch: 			15
HighWatermark:			234130
MaxFollowerLag: 		34
MaxFollowerLagTimeMs:	15
CurrentVoters:			[0, 1, 2]
TargetVoters:			[0, 3, 4]

> bin/kafka-metadata-quorum.sh --describe replication
ReplicaId	LogEndOffset	Lag		LagTimeMs	Status		IsReassignTarget
0			234134			0		0			Leader		Yes
1			234130			4		10			Follower	No
2			234100			34		15			Follower	No
3			234124			10		12			Observer	Yes
4			234130			4		15			Observer	Yes

...

Initially, the only purpose of this API is to perform reassignments. In the future, there may be additional uses. Below is an example usage:

Code Block
> bin/kafka-metadata-quorum.sh --alter --voters 0,3,4
CurrentVoters:			[0, 1, 2]
TargetVoters:			[0, 3, 4]

...