Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP proposes extending the communication between the controller and brokers to include enough information so that topic partition leaders know if they were elected because of an unclean leader election. This feature needs to support both the traditional ZooKeeper controller and the more recent KRaft controller.

ZooKeeper Controller

The controller will use ZooKeeper to persist the decision that it elected a leader through the unclean leader election strategy. This decision will get propagated to all of the affected brokers through the LeaderAndIsr request. Topic partition leaders will inform the controller that they have recovered from the unclean leader election using the AlterIsr request.

KRaft Controller

The controller will persist the decision that it elected a leader through The messages sent by the controller to the broker during leader election will be extended to include the last leader epoch that was elected using unclean leader election. The partition leader will assume that it was elected using the unclean leader election strategy in the cluster metadata internal partition using the PartitionRecord or PartitionChangeRecord. These records will get replicated to all of the brokers as described in KIP-595, KIP-630 and KIP-631. Topic partition leaders will inform the controller that they have recovered from unclean leader election using the AlterIsr request.

Changed Public Interfaces

ZooKeeper State

The state of the partition stored /brokers/topics/<topic>/partitions/<partition>/state  as JSON will be extended to add a new property named is_unclean  to have the following schema:

if the unclean epoch equals the leader epoch. In such cases the topic partition leader will perform any necessary recovery steps.

When the topic partition leader recovers from an unclean leader election, it will abort all all pending transaction. See

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7408
for more details.

ZooKeeper Controller

The controller will use ZooKeeper to persist the decision that it elected a leader through the unclean leader election strategy. This decision will get propagated to all of the affected brokers through the LeaderAndIsr request. Topic partition leaders will inform the controller that they have recovered from the unclean leader election using the AlterIsr request.

KRaft Controller

The controller will persist the decision that it elected a leader through the unclean leader election strategy in the cluster metadata internal partition using the PartitionRecord or PartitionChangeRecord. These records will get replicated to all of the brokers using the cluster metadata topic partition. Topic partition leaders will inform the controller that they have recovered from unclean leader election using the AlterIsr request.

Changed Public Interfaces

ZooKeeper State

The state of the partition stored in /brokers/topics/<topic>/partitions/<partition>/state  as JSON will be extended to add a new property named is_unclean  to have the following schema:

Code Block
Code Block
{
  "version": 1,
  "leader": NUMBER,
  "leader_epoch": NUMBER,
  "controller_epoch": NUMBER,
  "isr" ARRAY of NUMBER,
  "is_unclean": BOOLEAN // New property
}

The property is_unclean  indicates if the current leader was elected using the unclean leader election strategy. The default value for this property, in case it is missing, is false  .

LeaderAndIsr RPC

Request Schema

...

Code Block
{
  "apiKey": 4,
  "type": "request",
  "listeners": ["zkBroker"],
  "name": "LeaderAndIsrRequest",
  // Version 1 adds IsNew.
  //
  // Version 2 adds broker epoch and reorganizes the partitions by topic.
  //
  // Version 3 adds AddingReplicas and RemovingReplicas.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds Topic ID and Type to the TopicStates, as described in KIP-516.
  //
  // Version 6 adds IsUnclean as describe in KIP-704.
  "validVersions": "0-6",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The current controller ID." },
    { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The current controller epoch." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default": "-1",
      "about": "The current broker epoch." },
    { "name": "Type", "type": "int8", "versions": "5+",
      "about": "The type that indicates whether all topics are included in the request"},
    { "name": "UngroupedPartitionStates", "type": "[]LeaderAndIsrPartitionState", "versions": "0-1",
      "about": "The state of each partition, in a v0 or v1 message." },
    // In v0 or v1 requests, each partition is listed alongside its topic name.
    // In v2+ requests, partitions are organized by topic, so that each topic name
    // only needs to be listed once.
    { "name": "TopicStates", "type": "[]LeaderAndIsrTopicState", "versions": "2+",
      "about": "Each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "2+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "5+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "PartitionStates", "type": "[]LeaderAndIsrPartitionState", "versions": "2+",
        "about": "The state of each partition" }
    ]},
    { "name": "LiveLeaders", "type": "[]LeaderAndIsrLiveLeader", "versions": "0+",
      "about": "The current live leaders.", "fields": [
      { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The leader's broker ID." },
      { "name": "HostName", "type": "string", "versions": "0+",
        "about": "The leader's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The leader's port." }
    ]}
  ],
  "commonStructs": [
    { "name": "LeaderAndIsrPartitionState", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0-1", "entityType": "topicName", "ignorable": true,
        "about": "The topic name.  This is only present in v0 or v1." },
      { "name": "PartitionIndex", "type": "int32", "versions": "0+",
        "about": "The partition index." },
      { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
        "about": "The controller epoch." },
      { "name": "Leader", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The broker ID of the leader." },
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
        "about": "The leader epoch." },
      { "name": "Isr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
        "about": "The in-sync replica IDs." },
      { "name": "ZkVersion", "type": "int32", "versions": "0+",
        "about": "The ZooKeeper version." },
      { "name": "Replicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
        "about": "The replica IDs." },
      { "name": "AddingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true, "entityType": "brokerId",
        "about": "The replica IDs that we are adding this partition to, or null if no replicas are being added." },
      { "name": "RemovingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true, "entityType": "brokerId",
        "about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." },
      { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true,
        "about": "Whether the replica should have existed on the broker or not." },
      // -------- Properties added by this KIP ---------
      { "name": "IsUnclean", "type": "boolboolean", "versions": "6+", "default": "false", "ignorable": true,
        "about": "Whether the assigned leader was elected using the unclean leader election strategy." } 
    ]}
  ]
}

...

Code Block
  {
    "apiKey": 56,
    "type": "request",
    "listeners": ["zkBroker", "controller"],
    "name": "AlterIsrRequest",
    "validVersions": "0-1",
    "flexibleVersions": "0+",
    "fields": [
      { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The ID of the requesting broker" },
      { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
        "about": "The epoch of the requesting broker" },
      { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
        { "name":  "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The name of the topic to alter ISRs for" },
        { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index" },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The leader epoch of this partition" },
          { "name": "NewIsr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
            "about": "The ISR for this partition"},
          // ----- Start of properties added by this KIP -----
          { "name": "NewIsUnclean", "type": "boolean", "versions": "1+", "default": "false", "ignoreable": true,
            "about": "Whether the leader finishedis recovering from unclean leader election." },
          // ----- End of properties added by this KIP -----
          { "name": "CurrentIsrVersion", "type": "int32", "versions": "0+",
            "about": "The expected version of ISR which is being updated"}
        ]}
      ]}
    ]
  }

...

Code Block
  {
    "apiKey": 56,
    "type": "response",
    "name": "AlterIsrResponse",
    "validVersions": "0-1",
    "flexibleVersions": "0+",
    "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The top level response error code" },
      { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
        { "name":  "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The name of the topic" },
        { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index" },
          { "name": "ErrorCode", "type": "int16", "versions": "0+",
            "about": "The partition level error code" },
          { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
            "about": "The broker ID of the leader of the leader." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The leader epoch." },
          { "name": "LeaderEpochIsr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
            "about": "The in-sync leaderreplica epochIDs." },
          // ----- Start of properties added by this KIP -----
          { "name": "IsrIsUnclean", "type": "[]int32boolean", "versions": "01+", "entityTypedefault": "brokerId"false", "ignoreable": true,
            "about": "The in-sync replica IDsWhether the assigned leader that was elected using the unclean leader election strategy." },
              // ----- StartEnd of properties added by this KIP ----- 
          { "name": "IsUncleanCurrentIsrVersion", "type": boolean"int32", "versions": "1+", "default": "false", "ignoreable": true,0+",
            "about": "WhetherThe the assigned leader was elected using the unclean leader election strategycurrent ISR version." },
        ]}
  // ----- End of properties added]}
 by this KIP -----  ]
  }

Cluster Metadata Records

PartitionRecord

Add a property to indicate to the leader of the topic partition that it was elected using unclean leader election.

Code Block
  {
      { "nameapiKey": "CurrentIsrVersion"3,
 "type": "int32",   "versionstype": "0+metadata",
       "name": "PartitionRecord",
     "aboutvalidVersions": "The current ISR version." }0",
        ]}"flexibleVersions": "0+",
      ]}
    ]
  }

Cluster Metadata Records

PartitionRecord

Add a property to indicate to the leader of the topic partition that it was elected using unclean leader election.

Code Block
  {
    "apiKey": 3"fields": [
      { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
        "typeabout": "metadata"The partition id." },
      { "name": "PartitionRecordTopicId",
 "type": "uuid",  "validVersionsversions": "0+",
        "flexibleVersionsabout": "0+",
    "fields": [The unique ID of this topic." },
      { "name": "PartitionIdReplicas", "type":  "[]int32", "versions":  "0+", "defaultentityType": "-1brokerId",
        "about": "The replicas of this partition id, sorted by preferred order." },
      { "name": "TopicIdIsr", "type":  "uuid[]int32", "versions":  "0+",
        "about": "The uniquein-sync IDreplicas of this topic.partition" },
      { "name": "ReplicasRemovingReplicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
        "about": "The replicas that ofwe thisare partition,in sortedthe byprocess preferredof orderremoving." },
      { "name": "IsrAddingReplicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
        "about": "The replicas that in-sync replicaswe are in the process of this partitionadding." },
      { "name": "RemovingReplicasLeader", "type":  "[]int32", "versions":  "0+", "default": "-1", "entityType": "brokerId",
        "about": "The replicas that we are in the process of removing." }, lead replica, or -1 if there is no leader." },
      // ----- Start of properties added by this KIP -----
      { "name": "AddingReplicasIsUnclean", "type": "boolean", "default": "[]int32false", "versions":  "0+", "entityTypetaggedVersions": "0+", "brokerIdtag": 0,
        "about": "The replicas that we are in the process of adding." },Whether the assigned leader was elected using the unclean leader election strategy." },
      // ----- End of properties added by this KIP -----  
      { "name": "LeaderLeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
        "about": "The leadepoch replica,of or -1 if there is no the partition leader." },
      // ----- Start of properties added by this KIP ----- },
      { "name": "IsUncleanPartitionEpoch", "type": "boolean", "default": "false"int32", "versions": "0+", "taggedVersionsdefault": "0+", "tag": 0-1",
        "about": "WhetherAn epoch thethat assignedgets leaderincremented waseach electedtime usingwe thechange uncleananything leaderin electionthe strategy." },
      // ----- End of properties added by this KIPpartition." }
    ]
  }

PartitionChangeRecord

Add a property to indicate to the leader of the topic partition that it was elected using unclean leader election.

Code Block
  {
    "apiKey": 5,
     -----  
      { "name": "LeaderEpoch", "type": "int32metadata",
    "versionsname": "0+PartitionChangeRecord",
    "defaultvalidVersions": "-10",
        "aboutflexibleVersions": "The epoch of the partition leader." },"0+",
    "fields": [
      { "name": "PartitionEpochPartitionId", "type": "int32", "versions": "0+", "default": "-1",
        "about": "An epoch that gets incremented each time we change anything in the partition." }
    ]
  }

PartitionChangeRecord

Add a property to indicate to the leader of the topic partition that it was elected using unclean leader election.

Code Block
  {
    "apiKey": 5The partition id." },
      { "typename": "metadataTopicId",
    "nametype": "PartitionChangeRecorduuid",
    "validVersionsversions": "0+",
        "flexibleVersionsabout": "0+",
    "fields": [The unique ID of this topic." },
      { "name": "PartitionIdIsr", "type":  "[]int32", "versionsdefault": "0+null", "defaultentityType": "-1brokerId",
        "aboutversions": "The partition id." },
      { "name0+", "nullableVersions": "TopicId0+", "typetaggedVersions": "uuid0+", "versionstag": "0+",
        "about": "The unique ID of this topicnull if the ISR didn't change; the new in-sync replicas otherwise." },
      { "name": "IsrLeader", "type":  "[]int32", "default": "null-2", "entityType": "brokerId",
        "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 01,
        "about": "null-1 if there is now no leader; -2 if the ISRleader didn't change; the new in-sync replicasleader otherwise." },
      { "name": "LeaderReplicas", "type": "[]int32", "default": "-2null", "entityType": "brokerId",
        "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 12,
        "about": "-1 if there is now no leader; -2 null if the leaderreplicas didn't change; the new leaderreplicas otherwise." },
      { "name": "ReplicasRemovingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
        "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 23,
        "about": "null if the removing replicas didn't change; the new removing replicas otherwise." },
      { "name": "RemovingReplicasAddingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
        "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 34,
        "about": "null if the removingadding replicas didn't change; the new removing replicas otherwise." },t change; the new adding replicas otherwise." },
      // ----- Properties added by this KIP -----
      { "name": "AddingReplicasIsUnclean", "type": "[]int32boolean", "default": "null-1", "entityTypeversions": "brokerId0+",
        "versions"taggedVersions": "0+", "nullableVersionstag": "0+"5,
 "taggedVersions       "about": "0+", "tag": 4,
        "about": "null-1 if it didn't change; 0 if the leader was elected from the ISR; 1 if the addingleader replicasthat didn't change; the new adding replicas otherwisewas elected using unclean leader election." },
    ]
  // ----- Properties added by this KIP -----
      { "name": "IsUnclean", "type": "boolean", "default": "false", "versions": "0+", "taggedVersions": "0+", "tag": 5,
        "about": "Whether the assigned leader was elected using the unclean leader election strategy." }
    ]
  }

Compatibility, Deprecation, and Migration Plan

TODO: Fill this out. I don't think there are any compatibility considerations.

TODO: This depends on how the partition leader will coordinate with the transaction and group coordinator.

TODO: This depends on how the client handles offset out of range but this should already be covered by the Kafka consumer.

Rejected Alternatives

}

Compatibility, Deprecation, and Migration Plan

At a high-level this change is backwards compatible because the default value for all of the "is unclean" properties in the protocol is "false". When thinking about backward compatibility it is important to note that if the "is unclean" field is true then the ISR is guarantee to have a size of 1. The topic partition leader will not increase the ISR until it has recovered from the unclean leader election and has set the "is unclean" field to false.

Controller

If the ZooKeeper controller supports the feature then when performing an unclean leader election it will write "true" in ZK and it will set "false" in the LeaderAndIsr request. If the broker doesn't support this feature, the "is unclean" field in the LeaderAndIsr request will be ignored and behave as it currently does. When the broker sends the AlterIsr request to the controller, the controller will interpret the "is unclean" fields as "false" because the default value is false. A similar logic applies to the KRaft controller.

Rejected Alternatives

An alternative solution is to store the leader epoch when the unclean leader election was performed instead of storing a boolean in the "is unclean" field. The topic partition would perform unclean recovery when the unclean leader epoch is equal to the current leader epoch. One issue with this solution is that the controller changes the leader epoch when the leader goes offline. This means that the controller would have to also reset the unclean leader epoch when the leader goes offlineIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.