Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

While electing a leader, if If none of the in-sync replicas is are alive, the controller elects allows the user to elect a replica that was not a part of the in-sync replica set . Such a leader is called an using the unclean leader election strategy. Since this new leader replica was not a part of the last known in-sync replica set, it is associated with data loss in a partition.
Currently, there is no way for brokers to tell whether a given leader was an unclean leader at the time of its election. This information can be useful at the broker in order to invoke the appropriate data loss handling routine. One such scenario can be handling of

LeaderAndIsr

data loss in a partition that is part of a transaction.

Another scenario is when a partition holds some kind of metadata and any data loss in this partition further requires an automated or manual intervention.

Changed Interfaces

  • LeaderAndIsr data to be saved at zoo keeper will add a boolean flag to indicate whether the current leader is unclean or not.

...

there is the possibility for data loss by deleting log records committed by the previous leader(s). In addition the lost of these records can cause some inconsistency with other parts of the system like the transaction coordinators and the group coordinators. If the controller is able to communicate to the new topic partition leader that it was elected using unclean leader election, the new topic partition leader can coordinate this recovery and mitigate this inconsistencies.

Proposed Changes

This KIP proposes extending the communication between the controller and brokers to include enough information so that topic partition leaders know if they were elected because of an unclean leader election. This feature needs to support both the traditional ZooKeeper controller and the more recent KRaft controller.

ZooKeeper Controller

The controller will use ZooKeeper to persist the decision that it elected a leader through the unclean leader election strategy. This decision will get propagated to all of the affected brokers through the LeaderAndIsr request. Topic partition leaders will inform the controller that they have recovered from the unclean leader election using the AlterIsr request.

KRaft Controller

The controller will persist the decision that it elected a leader through the unclean leader election strategy in the cluster metadata internal partition using the PartitionRecord or PartitionChangeRecord. These records will get replicated to all of the brokers as described in KIP-595, KIP-630 and KIP-631. Topic partition leaders will inform the controller that they have recovered from unclean leader election using the AlterIsr request.

Changed Public Interfaces

ZooKeeper State

The state of the partition stored /brokers/topics/<topic>/partitions/<partition>/state  as JSON will be extended to add a new property named is_unclean  to have the following schema:

Code Block
{
  "version": 1,
  "leader": NUMBER,
  "leader_epoch": NUMBER,
  "controller_epoch": NUMBER,
  "isr" ARRAY of NUMBER,
  "is_unclean": BOOLEAN // New property
}

The property is_unclean  indicates if the current leader was elected using the unclean leader election strategy. The default value for this property, in case it is missing, is false .

LeaderAndIsr RPC

Request Schema

Add a property to indicate to the leader of the topic partition that it was elected using unclean leader election.

Code Block
{
  "apiKey": 4,
  "type": "request",
  "listeners": ["zkBroker"],
  "name": "LeaderAndIsrRequest",
  // Version 1 adds IsNew.
  //
  // Version 2 adds broker epoch and reorganizes the partitions by topic.
  //
  // Version 3 adds AddingReplicas and RemovingReplicas.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds Topic ID and Type to the TopicStates, as described in KIP-516.
  //
  // Version 6 adds IsUnclean as describe in KIP-704.
  "validVersions": "0-6",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The current controller ID." },
    { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The current controller epoch." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default": "-1",
      "about": "The current broker epoch." },
    { "name": "Type", "type": "int8", "versions": "5+",
      "about": "The type that indicates whether all topics are included in the request"},
    { "name": "UngroupedPartitionStates", "type": "[]LeaderAndIsrPartitionState", "versions": "0-1",
      "about": "The state of each partition, in a v0 or v1 message." },
    // In v0 or v1 requests, each partition is listed alongside its topic name.
    // In v2+ requests, partitions are organized by topic, so that each topic name
    // only needs to be listed once.
    { "name": "TopicStates", "type": "[]LeaderAndIsrTopicState", "versions": "2+",
      "about": "Each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "2+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "5+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "PartitionStates", "type": 

...

"[]LeaderAndIsrPartitionState", "versions": "2+",
        "about": "The state of each partition" }
    ]},
    { "name": 

...

"LiveLeaders", "type": 

...

"[

...

]LeaderAndIsrLiveLeader", "versions": "0+",
      "about": "The current live leaders.", "fields": [
      { "name": "BrokerId", "type": "int32", 

...

"versions": "0+", "entityType": "brokerId",
        "about": "The leader's broker ID." },
      { "name": "HostName", "type": "string", "versions": "0+",
        "about": "The leader's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The leader's port." }
    ]}
  ],
  "commonStructs": [
    { "name": "LeaderAndIsrPartitionState", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0-1", "entityType": "topicName", "ignorable": true,
   

...

  • This boolean information will be sent to the brokers in the LeaderAndIsrRequest under LeaderAndIsrPartitionState commonStructs array, like so:
     "about": "The topic name.  This is only present in v0 or v1." },
      { "name": "PartitionIndex", "type": "int32", "versions": "0+",
        "about": "The partition index." },
      { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
        "about": "The controller epoch." },
      { "name": "Leader", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The broker ID of the leader." },
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
        "about": "The leader epoch." },
      { "name": "Isr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
        "about": "The in-sync replica IDs." },
      { "name": "ZkVersion", "type": "int32", "versions": "0+",
        "about": "The ZooKeeper version." },
      { "name": "Replicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
        "about": "The replica IDs." },
      { "name": "AddingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true, "entityType": "brokerId",
        "about": "The replica IDs that we are adding this partition to, or null if no replicas are being added." },
      { "name": "RemovingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true, "entityType": "brokerId",
        "about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." },
      { "name": "

...

IsNew", "type": "bool", "versions": "

...

1+", "default": "false", "ignorable": true,
        "

...

about": 

...

"Whether the replica should have existed on the broker or not." },
      // -------- Properties added by this KIP ---------
      { 

...

"name": "IsUnclean", "type": "bool", "versions": "

...

6+", "default": "false", "ignorable": true,
        "about": "Whether the assigned leader was elected using the unclean leader 

...

election 

...

strategy." }

...

 
    ]}
  ]
}

Response Schema

No changes required to the response.

AlterIsr RPC

Request Schema

The schema will include a new property for the leader to indicate to the controller when it finished handling unclean leader election.

Code Block
  {
    "apiKey": 56,
    "type": "request",
    "listeners": ["zkBroker", "controller"],
    "name": "AlterIsrRequest",
    "validVersions": "0-1",
    "flexibleVersions": "0+",
    "fields": [
      { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The ID of the requesting broker" },
      { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
        "about": "The epoch of the requesting broker" },
      { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
        { "name":  "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The name of the topic to alter ISRs for" },
        { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index" },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The leader epoch of this partition" },
          { "name": "NewIsr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
            "about": "The ISR for this partition"},
          // ----- Start of properties added by this KIP -----
          { "name": "NewIsUnclean", "type": boolean", "versions": "1+", "default": "false", "ignoreable": true,
            "about": "Whether the leader finished recovering from unclean leader election." },
          // ----- End of properties added by this KIP -----
          { "name": "CurrentIsrVersion", "type": "int32", "versions": "0+",
            "about": "The expected version of ISR which is being updated"}
        ]}
      ]}
    ]
  }

Response Schema

Add a property to indicate to the leader of the topic partition that it was elected using unclean leader election.

Code Block
  {
    "apiKey": 56,
    "type": "response",
    "name": "AlterIsrResponse",
    "validVersions": "0-1",
    "flexibleVersions": "0+",
    "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The top level response error code" },
      { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
        { "name":  "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The name of the topic" },
        { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index" },
          { "name": "ErrorCode", "type": "int16", "versions": "0+",
            "about": "The partition level error code" },
          { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
            "about": "The broker ID of the leader." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The leader epoch." },
          { "name": "Isr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
            "about": "The in-sync replica IDs." },
          // ----- Start of properties added by this KIP -----
          { "name": "IsUnclean", "type": boolean", "versions": "1+", "default": "false", "ignoreable": true,
            "about": "Whether the assigned leader was elected using the unclean leader election strategy." },
          // ----- End of properties added by this KIP ----- 
          { "name": "CurrentIsrVersion", "type": "int32", "versions": "0+",
            "about": "The current ISR version." }
        ]}
      ]}
    ]
  }

Cluster Metadata Records

PartitionRecord

Add a property to indicate to the leader of the topic partition that it was elected using unclean leader election.

Code Block
  {
    "apiKey": 3,
    "type": "metadata",
    "name": "PartitionRecord",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
      { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
        "about": "The partition id." },
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique ID of this topic." },
      { "name": "Replicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
        "about": "The replicas of this partition, sorted by preferred order." },
      { "name": "Isr", "type":  "[]int32", "versions":  "0+",
        "about": "The in-sync replicas of this partition" },
      { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
        "about": "The replicas that we are in the process of removing." },
      { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
        "about": "The replicas that we are in the process of adding." },
      { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
        "about": "The lead replica, or -1 if there is no leader." },
      // ----- Start of properties added by this KIP -----
      { "name": "IsUnclean", "type": boolean", "versions": "1+", "default": "false", "ignoreable": true,
        "about": "Whether the assigned leader was elected using the unclean leader election strategy." },
      // ----- End of properties added by this KIP -----  
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
        "about": "The epoch of the partition leader." },
      { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
        "about": "An epoch that gets incremented each time we change anything in the partition." }
    ]
  }


PartitionChangeRecord

Add a property to indicate to the leader of the topic partition that it was elected using unclean leader election.

Code Block
  {
    "apiKey": 5,
    "type": "metadata",
    "name": "PartitionChangeRecord",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
      { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
        "about": "The partition id." },
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The unique ID of this topic." },
      { "name": "Isr", "type":  "[]int32", "default": "null", "entityType": "brokerId",
        "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0,
        "about": "null if the ISR didn't change; the new in-sync replicas otherwise." },
      { "name": "Leader", "type": "int32", "default": "-2", "entityType": "brokerId",
        "versions": "0+", "taggedVersions": "0+", "tag": 1,
        "about": "-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." },
      { "name": "Replicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
        "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 2,
        "about": "null if the replicas didn't change; the new replicas otherwise." },
      { "name": "RemovingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
        "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 3,
        "about": "null if the removing replicas didn't change; the new removing replicas otherwise." },
      { "name": "AddingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
        "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 4,
        "about": "null if the adding replicas didn't change; the new adding replicas otherwise." },
      // ----- Properties added by this KIP -----
      { "name": "IsUnclean", "type": "boolean", "default": "false", "versions": "0+", "taggedVersions": "0+", "tag": 5,
        "about": "Whether the assigned leader was elected using the unclean leader election strategy." }
    ]
  }

Compatibility, Deprecation, and Migration Plan

TODO: Fill this out. I don't think there are any compatibility considerations.

TODO: This depends on how the partition leader will coordinate with the transaction and group coordinator.

TODO: This depends on how the client handles offset out of range but this should already be covered by the Kafka consumer.

...

  • Leader replica will use the AlterISR RPC to set the IsUncleanLeader state to false once it has completed the desired recovery.

Proposed Changes

This KIP proposes to append a boolean state to the LeaderAndIsr state maintained at zookeeper. The new boolean state will be called IsUncleanLeader. When set to true, it will signify that the current leader was elected as an unclean leader, false otherwise. The flag can be maintained according to the following rules:

Controller will set it to true when it makes an unclean leader election.

Controller will set it to false when it elects an in-sync replica as the leader.

Leader replica may set it to false once it has processed an appropriate data loss handling routine.

All other operations that mutate the LeaderAndIsr state like expandIsr/shrinkIsr retain the state as-is.

Controller sends a LeaderAndIsrRequest to each replica when it elects a new leader. This KIP proposes to send the IsUncleanLeader flag to the replicas, along with rest of the LeaderAndIsrRequest data. Leader of a partition can use this information to invoke a data loss handling routine.

Once the partition leader has handled the IsUncleanLeader flag in the incoming LeaderAndIsrRequest, it may get the controller to modify its LeaderAndIsr state to set this flag to false.

Compatibility, Deprecation, and Migration Plan

...

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.