Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
kafka-metadata-quorum --bootstrap-server <endpoints> add-controller --config controller.properties

...

Code Block
kafka-metadata-quorum --bootstrap-server <endpoints> remove-controller --voterreplica-id <voter<replica-id> --voterreplica-uuid <voter<replica-uuid>

Common Scenarios

To better illustrate this feature this section describes two common scenarios that the Kafka administrator may perform.

...

Automatic endpoint and directory id discovery

TODO: Talk about when voters will send AddVoter request to the active leader.

High Watermark

As describe in KIP-595, the high-watermark will be calculated using the fetch offset of the majority of the voters. When a replica is removed or added it is possible for the high-watermark to decrease. The leader will not allow the high-watermark to decrease and will guarantee that is is monotonically increasing for both the state machines and the remote replicas.

With this KIP, it is possible for the leader to not be part of the voter set when the replica removed is the leader. In this case the leader will continue to handle Fetch and FetchSnapshot request as normal but it will not count itself when computing the high watermark.

Snapshots

The snapshot generation code needs to be extended to include these new KRaft specific control records for AddVoter and RemoveVoter. Before this KIP the snapshot didn't include any KRaft generated control records.

Internal Listener

The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and reverted.

Public Interfaces

Configuration

These two configurations are mutually exclusive the KRaft cluster is expected to use one or the other but not both.

controller.quorum.voters

This is an existing configuration. If the cluster uses this configuration to configure the quorum, adding new replica ids will not be supported. The cluster will only support changing the UUID for an existing replica id.

controller.quorum.bootstrap.servers

This configuration can be used instead of controller.quorum.voters. This is a list of nodes that brokers and new controllers can use to discover the quorum leader. Brokers and new controllers (observers) will send Fetch requests to all of the nodes in this configuration until they discover the quorum leader and the Fetch request succeeds. The quorum voters and their configuration will be learned by fetching and reading the records from the log and snapshot. This includes committed and uncommitted records.

Log and Snapshot Control Records

To improve the usability of this feature it would beneficial for the leader of the KRaft cluster metadata leader to automatically rediscover the voters' endpoints. This makes it possible for the operator to update the endpoint of a voter without having to use the kafka-metadata-quorum tool. When a voter becomes a follower and discovers a new leader will always send an AddVoter RPC to the leader. This request will have the OverrideOnly flag set to true which will instruct the leader to only apply this add voter operation if the replica is already a voter and the endpoints are different. When at voter becomes a leader it will also write an AddVoter record for itself if the endpoints have changed.

The directory id is different. The quorum shouldn't automatically update the directory id has it is a nice the the disk was replaced. For directory id, the leader will only override it if it was not previously set. This is useful for when a cluster gets upgrade to a kraft.version greater than 1.

High Watermark

As describe in KIP-595, the high-watermark will be calculated using the fetch offset of the majority of the voters. When a replica is removed or added it is possible for the high-watermark to decrease. The leader will not allow the high-watermark to decrease and will guarantee that is is monotonically increasing for both the state machines and the remote replicas.

With this KIP, it is possible for the leader to not be part of the voter set when the replica removed is the leader. In this case the leader will continue to handle Fetch and FetchSnapshot request as normal but it will not count itself when computing the high watermark.

Snapshots

The snapshot generation code needs to be extended to include these new KRaft specific control record for AddVoter. Before this KIP the snapshot didn't include any KRaft generated control records.

Internal Listener

The KRaft implementation and protocol describe in KIP-595 and KIP-630 never read from the log or snapshot. This KIP requires the KRaft implementation now read uncommitted data from log and snapshot to discover the voter set. This also means that the KRaft implementation needs to handle this uncommitted state getting truncated and reverted.

Public Interfaces

Configuration

There only two configurations for this feature.

controller.quorum.voters

This is an existing configuration. This configuration describes the state of the quorum and will only be used if the kraft.version feature is 0.

controller.quorum.bootstrap.servers

This is a list of nodes that brokers and new controllers can use to discover the quorum leader. Brokers and new controllers (observers) will send Fetch requests to all of the nodes in this configuration until they discover the quorum leader and the Fetch request succeeds. The quorum voters and their configuration will be learned by fetching and reading the records from the log and snapshot. This includes committed and uncommitted records.

If this configuration is specified, observers will not use the controller.quorum.voters endpoints to discover the leader. 

Log and Snapshot Control Records

Two new control records will be added to Two new control records will be added to the log and snapshot of a KRaft partition.

...

Add an optional VoterUuid to Voter. This change is not needed for correctness but it is nice to have for tracing and debugging.. The leader will write version 0 if the kraft.version is 0. The leader will write version 1 if the kraft.version is 1.

Code Block
languagejs
git diff upstream/trunk clients/src/main/resources/common/message/LeaderChangeMessage.json
diff --git a/clients/src/main/resources/common/message/LeaderChangeMessage.json b/clients/src/main/resources/common/message/LeaderChangeMessage.json
index fdd7733388..2b019a2a80 100644
--- a/clients/src/main/resources/common/message/LeaderChangeMessage.json
+++ b/clients/src/main/resources/common/message/LeaderChangeMessage.json
@@ -16,7 +16,7 @@
 {
  
Code Block
languagejs
  {
    "type": "data",
    "name": "LeaderChangeMessage",
    "validVersions": "0-1",
    "flexibleVersions": "0+",
    "fields": [
      { "name": "Version", "type": "int16", "versions": "0+",
        "about": "The version of the leader change message" },
      { "name": "LeaderId", "type": "int32data",
 "versions": "0+", "entityTypename": "brokerIdLeaderChangeMessage",
    -    "aboutvalidVersions": "The ID of the newly elected leader" },
      { "name0",
+  "validVersions": "Voters0-1",
 "type": "[]Voter", "versionsflexibleVersions": "0+",
        "aboutfields": "The set of voters in the quorum for this epoch" },
 [
     { "name": "GrantingVotersVersion", "type": "[]Voterint16", "versions": "0+",
@@        "about": "The voters who voted for the leader at the time of election" }-30,7 +30,8 @@
    ],
    "commonStructs": [
      { "name": "Voter", "versions": "0+", "fields": [
-      {"name": "VoterId", "type": "int32", "versions": "0+"}
+      { "name": "VoterId", "type": "int32", "versions": "0+" },
  +      { "name": "VoterUuid", "type": "uuidint32", "versions": "1+" }
      ]}
    ]
 } }

AddVoterRecord

A control record for instructing the voters to add a new voter to the topic partition. This record can exist in both the log and the snapshot of a topic partition.

...

NAMETAGSTYPENOTE
number-of-voterstype=raft-metricsgaugenumber of voters for the cluster metadata topic partition.
number-of-observerstype=raft-metricsguagenumber of observer that could be promoted to voters.
pending-add-votertype=raft-metricsguage

1 if there is a pending add voter operation, 0 otherwise.

pending-remove-votertype=raft-metricsguage1 if there is a pending remove voter operation, 0 otherwise.
TBDTBDTBDguage

1 if a controller node is not a voter for the KRaft cluster metadata partition, 0 otherwise.

duplicate-voter-idstype=raft-metricsgauge

Counts the number of duplicate replica id in the set of voters.

number-of-offline-voterstype-=raft-metricsgauge

Number of voters with a last Fetch timestamp greater than the Fetch timeout.

ignored-static-votersTBDgauge

1 if controller.quorum.voter is set and the kraft.version is greater than 0, 0 otherwise.

Command Line Interface

kafka-metadata-shell.sh

...

The kafka-metadata-quorum tool described in KIP-595 and KIP-836 will be improved to support these additional commands:

...

describe

TODO: This command should print all of the voter endpoints that the leader knows about. It should also display if there are any uncommitted add or remove voter changes.

This command will print both the ReplicaId and ReplicaUuid for CurrentVoters. A new row called CouldBeVoters will be added which print the Replica ID and UUID of any replica that could be added to the voter set. E.g.

Code Block
> bin/kafka-metadata-quorum.sh --describe
ClusterId:              SomeClusterId
LeaderId:               0
LeaderEpoch:            15
HighWatermark:          234130
MaxFollowerLag:         34
MaxFollowerLagTimeMs:   15
CurrentVoters:          [{"id": 0, "uuid": "UUID1"}, {"id": 1, "uuid": "UUID2"}, {"id": 2, "uuid": "UUID2"}]
CouldBeVotersObservers:              [{"id": 3, "uuid": "UUID3"}]

describe --

...

replication

This command will print on additional column for the replica uuid after the replica id. E.g.

Code Block
> bin/kafka-metadata-quorum.sh --describe replication
ReplicaId   ReplicaUuid   LogEndOffset   ...
0           uuid1         234134         ...
...

...

add-

...

controller

This command is used to add new voters to the topic partition. The flags --replicaId replica-id and --replica-replicaUuid uuid must be specified. A future KIP will describe how the user can specify endpoint information for the replica.

...

remove-

...

controller

This command is used to remove voters from the topic partition. The flags --replicaId replica-id and --replicaUuid replica-uuid must be specified.

Compatibility, Deprecation, and Migration Plan

...