Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread: https://lists.apache.org/thread/ld2t2xkby7rpgrggqo1h344goddfdnxb

...

If none of the in-sync replicas are alive, the controller allows the user to elect a replica that was not a part of the in-sync replica set using the unclean leader election strategy. Since this new leader replica was not a part of the last known in-sync replica set, there is the possibility for data loss by deleting log records committed by the previous leader(s). In addition, the lost of loss of these records can cause some inconsistency with other parts of the system like the transaction coordinators and the group coordinators. If the controller is able to communicate to the new topic partition leader that it was elected using unclean leader election, the new topic partition leader can coordinate this recovery and mitigate these inconsistencies.

...

One of the ways Kafka is going to use this feature is to abort all pending transaction when recovering from unclean leader election. See

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7408
for more details.

Topic Partition Follower

With KIP-392, it is possible for follower to receive FETCH request from the consumer. Follower will return a NOT_LEADER_OR_FOLLOWER error while the leader it sill recovering. This means that the follower will return this error when it receives a LEADER_AND_ISR request with RECOVERING set until it receives another request with RECOVERED. The follower will not ignore request that have the same leader epoch.

Controller

ZooKeeper Controller

...

Code Block
{
  "apiKey": 4,
  "type": "request",
  "listeners": ["zkBroker"],
  "name": "LeaderAndIsrRequest",
  // Version 1 adds IsNew.
  //
  // Version 2 adds broker epoch and reorganizes the partitions by topic.
  //
  // Version 3 adds AddingReplicas and RemovingReplicas.
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds Topic ID and Type to the TopicStates, as described in KIP-516.
  //
  // Version 6 adds LeaderRecoveryState as describe in KIP-704.
  "validVersions": "0-6",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The current controller ID." },
    { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The current controller epoch." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default": "-1",
      "about": "The current broker epoch." },
    { "name": "Type", "type": "int8", "versions": "5+",
      "about": "The type that indicates whether all topics are included in the request"},
    { "name": "UngroupedPartitionStates", "type": "[]LeaderAndIsrPartitionState", "versions": "0-1",
      "about": "The state of each partition, in a v0 or v1 message." },
    // In v0 or v1 requests, each partition is listed alongside its topic name.
    // In v2+ requests, partitions are organized by topic, so that each topic name
    // only needs to be listed once.
    { "name": "TopicStates", "type": "[]LeaderAndIsrTopicState", "versions": "2+",
      "about": "Each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "2+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "5+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "PartitionStates", "type": "[]LeaderAndIsrPartitionState", "versions": "2+",
        "about": "The state of each partition" }
    ]},
    { "name": "LiveLeaders", "type": "[]LeaderAndIsrLiveLeader", "versions": "0+",
      "about": "The current live leaders.", "fields": [
      { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The leader's broker ID." },
      { "name": "HostName", "type": "string", "versions": "0+",
        "about": "The leader's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The leader's port." }
    ]}
  ],
  "commonStructs": [
    { "name": "LeaderAndIsrPartitionState", "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0-1", "entityType": "topicName", "ignorable": true,
        "about": "The topic name.  This is only present in v0 or v1." },
      { "name": "PartitionIndex", "type": "int32", "versions": "0+",
        "about": "The partition index." },
      { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
        "about": "The controller epoch." },
      { "name": "Leader", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The broker ID of the leader." },
      { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
        "about": "The leader epoch." },
      { "name": "Isr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
        "about": "The in-sync replica IDs." },
      { "name": "ZkVersion", "type": "int32", "versions": "0+",
        "about": "The ZooKeeper version." },
      { "name": "Replicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
        "about": "The replica IDs." },
      { "name": "AddingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true, "entityType": "brokerId",
        "about": "The replica IDs that we are adding this partition to, or null if no replicas are being added." },
      { "name": "RemovingReplicas", "type": "[]int32", "versions": "3+", "ignorable": true, "entityType": "brokerId",
        "about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." },
      { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true,
        "about": "Whether the replica should have existed on the broker or not." },
      // -------- Properties added by this KIP ---------
      { "name": "LeaderRecoveryState", "type": "int8", "versions": "6+", "default": "0", "ignorable": true,
        "about": "Indicates if the partition is recovering from an election." } 
    ]}
  ]
}

Request Handling

When the leader receives an LeaderAndIsr request with the LeaderRecoveryState set to RECOVERING it will attempt to recover the partition irrespective of the leader's IBP.

Response Schema

No changes required to the response.

...

Code Block
  {
    "apiKey": 56,
    "type": "request",
    "listeners": ["zkBroker", "controller"],
    "name": "AlterPartitionRequest",
    "validVersions": "0-1",
    "flexibleVersions": "0+",
    "fields": [
      { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
        "about": "The ID of the requesting broker" },
      { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
        "about": "The epoch of the requesting broker" },
      { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
        { "name":  "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The name of the topic to alter ISRs for" },
        { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index" },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The leader epoch of this partition" },
          { "name": "NewIsr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
            "about": "The ISR for this partition"},
          // ----- Start of properties added by this KIP -----
          { "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", "default": "0", "ignorable": true,
            "about": "Indicates if the partition is recovering from an election." },
          // ----- End of properties added by this KIP -----
          { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
            "about": "The expected epoch of the partition which is being updated"}
        ]}
      ]}
    ]
  }

Request Handling

The controller will validate the LeaderRecoveryState field and return an INVALID_REQUEST error when:

  1. The size of the ISR is greater than 1 and the LeaderRecoveryState is RECOVERING.
  2. The LeaderRecoveryState is changing from RECOVERING to RECOVEREDRECOVERED to RECOVERING.

If the controllers receives an AlterPartition with a version of 0, it will assume that the leader has recovered from the unclean leader election.

Response Schema

The name of the request response will be changed to AlterPartitionResponse from AlterIsrResponse. The field CurrentIsrVersion will be renamed to PartitionEpoch.

Add a property to indicate to the leader of the topic partition that it is must recover the partitionrecovery state after processing the AlterPartition request. The broker does not take the value in this field as a trigger to start recovery. That happens through either the LeaderAndIsr request or the relevant record in Kraft mode.

Code Block
  {
    "apiKey": 56,
    "type": "response",
    "name": "AlterPartitionResponse",
    "validVersions": "0-1",
    "flexibleVersions": "0+",
    "fields": [
      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The top level response error code" },
      { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
        { "name":  "Name", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The name of the topic" },
        { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index" },
          { "name": "ErrorCode", "type": "int16", "versions": "0+",
            "about": "The partition level error code" },
          { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
            "about": "The broker ID of the leader." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The leader epoch." },
          { "name": "Isr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
            "about": "The in-sync replica IDs." },
          // ----- Start of properties added by this KIP -----
          { "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", "default": "0", "ignorable": true,
            "about": "Indicates if the partition is recovering from an election." },
          // ----- End of properties added by this KIP ----- 
          { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
            "about": "The current epoch of the partition." }
        ]}
      ]}
    ]
  }

...

Compatibility, Deprecation, and Migration Plan

This feature will be guarded by an IBP bump. At a high-level this change is backwards compatible because the default value for all of the leader recovery state field in the protocol is RECOVERED. When thinking about backward compatibility it is important to note that if the leader recovery state field is RECOVERING then the ISR is guarantee to have a size of 1. The topic partition leader will not increase the ISR until it has recovered from the unclean leader election and has set the leader recovery state field to RECOVERED.

...

If the ZooKeeper controller supports this KIP and it is enabled because the IBP is large enough, when performing an unclean leader election it will write RECOVERING in ZK for the leader recovery state field and it will set RECOVERING in the LeaderAndIsr request. If the broker doesn't support this feature, the LeaderRecoveryState field in the LeaderAndIsr request will be ignored and behave as it currently does. When the broker sends the AlterPartition request to the controllerwith a version of 0, the controller will interpret the LeaderRecoveryState field as RECOVERED because that's the default value. A similar logic applies to the KRaft controllerassume that the leader has recovered from the unclean leader election.

It is possible that the topic partition leader doesn't support this KIP and never sends an AlterPartition request to the controller. If this happens, it means that the ISR is set to the leader so the controller will never election another replica as leader. If the broker gets upgraded to a version that support this feature, it will receive an LeaderAndIsr request with the supports this feature but its IBP hasn't been increased. If the leader receives a LeaderAndIsr request with LeaderRecoveryState field set to RECOVERING, it will perform unclean leader recovery and finally set the LeaderRecoveryState flag to RECOVEREDirrespective of the IBP and send an AlterPartition request that matches the current IBP.

Clients

With this KIP the requests FETCH, PRODUCE, LIST_OFFSETS, DELETE_RECORDS and OFFSET_FOR_LEADER_EPOCH will return a NOT_LEADER_OR_FOLLOWER error for any topic partition for which the leader is recovering. This is backward compatible because the clients will retry this error.

...