Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Accepted

Under Discussion thread: here

Discussion Vote thread: here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. Change of the ELR does not require a leader epoch bump. In most cases, the ELR updates along with the ISR changes. The only case of the ELR changes alone is when an ELR broker registers after an unclean shutdown. In this case, no need to bump the leader epoch.

  2. When updating the config min.insync.replicas, if the new min ISR <= current ISR, the ELR will be removed.

  3. A new metric of Electable leaders will be added. It reflects the count of (ISR + ELR).

  4. The AlterPartitionReassignments. The leader updates the ISR implicitly with AlterPartition requests. The controller will make sure than upon completion, the ELR only contains replicas in the final replica set. Additionally, in order to improve the durability of the reassignment

    1. The current behavior, when completing the reassignment, all the adding replicas should be in ISR. This behavior can result in 1 replica in ISR. Also, ELR may not help here because the removing ISR replicas can not stay in ELR when completed. So we propose to enforce that the reassignment can only be completed if the ISR size is larger or equal to min ISR.
    2. This min ISR requirement is also enforced when the reassignment is canceled.
  5. Have a new admin API  DescribeTopicRequest DescribeTopicsRequest for showing the topic details. We don't want to embed the ELR info in the Metadata API. The ELR is not some necessary details to be exposed to user clients.

    1. More public facing details will be discussed in the DescribeTopicRequest  DescribeTopicsRequest section.
  6. We also record the last-known ELR members.

    1. It basically means when an ELR member has an unclean shutdown, it will be removed from ELR and added to the LastKnownELR. The LastKnownELR will be cleaned when ISR reaches the min ISR.

    2. LastKnownELR is stored in the metadata log.

    3. LastKnownELR will be also useful in the Unclean Recovery section.

  7. The last known leader will be tracked.
    1. This can be used if the Unclean recovery is not enabled. More details will be discussed in the Deliver Plan.
    2. The controller will record the last ISR member(the leader) when it is fenced.
    3. It will be cleaned when a new leader is elected.

...

  1. During the shutdown, write the current broker epoch in the CleanShutdownFile.

  2. During the start, the broker will try to read the broker epoch from the CleanShutdownFile. Then put this broker epoch in the broker registration request.

  3. The controller will verify the broker epoch in the request with its registration record. If it is the same, it is a clean shutdown.

  4. if the broker shuts down before it receives the broker epoch, it will write -1.

Note, the CleanShutdownFile is removed after the log manager is initialized. It will be created and written when the log manager is shutting down.

Unclean recovery

As the new proposal allows the ISR to be empty, the leader election strategy has to be reviewed.

...

The Unclean Recovery uses a deterministic way to elect the leader persisted the most data. On a high level, once the unclean recovery is triggered, the controller will use a new API GetReplicaLogInfo to query the log end offset and the leader epoch from each replica. The one with the highest leader epoch plus the longest log end offset will be the new leader. To help explain when and how the Unclean Recovery is performed, let's first introduce some config changes.

The current new unclean.leader.election.enable will be extended with 3 more optionsrecovery.strategy has the following 3 options.

AggressiveProactive. It represents the intent of recovering the availability as fast as possible.
Balanced. Auto recovery on potential data loss case, wait as needed for a better result.
ManualNone. Stop the partition on potential data loss.

...

  1. If there are other ISR members, choose an ISR member.

  2. If there are unfenced ELR members, choose an ELR member.

  3. If there are fenced ELR members

    1. If the unclean.leaderrecovery.election.enablestrategy=ProactiveAggressive, then an unclean recovery will happen.

    2. Otherwise, we will wait for the fenced ELR members to be unfenced.

  4. If there are no ELR members.

    1. If the unclean.leaderrecovery.election.enablestrategy=ProactiveAggressive, the controller will do the unclean recovery.

    2. If the unclean.leaderrecovery.election.enablestrategy=Balanced, the controller will do the unclean recovery when all the LastKnownELR are unfenced. See the following section for the explanations.
    3. Otherwise, unclean.leaderrecovery.election.enablestrategy=ManualNone, the controller will not attempt to elect a leader. Waiting for the user operations.

...

  • In Balance mode, all the LastKnownELR members have replied, plus the replicas replied within the timeout. Due to this requirement, the controller will only start the recovery if the LastKnownELR members are all unfenced.
  • In Aggressive Proactive modemode, any replicas replied within a fixed amount of time OR the first response received after the timeout.

...

  1. The kafka-leader-election.sh tool will be upgraded to allow manual leader election.

    1. It can directly select a leader.

    2. It can trigger an unclean recovery for the replica with the longest log in either Proactive Aggressive or Balance mode.

  2. Configs to update. Please refer to the Config Changes section
  3. For compatibility, the original unclean.leader.election.enable options True/False will be mapped to unclean.recovery.strategy options.
      enable. Described in the above section. Balanced is the default value. 
    1. unclean.recoveryleader.managerelection.enabled. True for using the unclean recovery manager to perform an unclean recovery. False otherwise. False is the default value.
    2. unclean.recovery.timeout.ms. The time limits of waiting for the replicas' response during the Unclean Recovery. 5 min is the default value.
  4. For the compatibility issue. The original unclean.leader.election.enable options True/False will be used but meaning differently once the unclean recovery manager is in use. Here is the behavior when ISR and ELR are empty.

...

    1. enable.false -> unclean.recovery.strategy.Balanced
    2. unclean.leader.election.enable.true -> unclean.recovery.strategy.Aggressive

Public Interfaces

We will deliver the KIP in phases, so the API changes are also marked coming with either ELR or Unclean Recovery.

PartitionChangeRecord (coming with ELR)

{
  "apiKey": 5,
  "type": "metadata",
  "name": "PartitionChangeRecord",
  "validVersions": "0-

Public Interfaces

We will deliver the KIP in phases, so the API changes are also marked coming with either ELR or Unclean Recovery.

PartitionChangeRecord (coming with ELR)

{
  "apiKey": 5,
  "type": "metadata",
  "name": "PartitionChangeRecord",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
...
// New fields begin.
    { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 6,
      "about": "null if the ELR didn't change; the new eligible leader replicas otherwise." }
    { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 7,
      "about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." }
{ "name": "LastKnownLeader", "type": "int32", "default": "null", "entityType": "brokerId",       "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 8,       "about": "-1 means no last known leader needs to be tracked." }
// New fields end.   ] }

...

{
  "apiKey":62,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerRegistrationRequest",
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
...
// New fields begin.
    { "name": "PreviousBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
      "about": "The epoch before a clean shutdown." }
// New fields end.
  ]
}

...

DescribeTopicPartitionsRequest (Coming with ELR)

Should be issued by admin clients. The broker will serve this request. As a part of the change, the admin client will start to use DescribeTopicRequest

to query the topic, instead of using the metadata requests.

On the other hand, The following changes may affect the client side.

  • The TopicPartitionInfo will also updated to include the ELR info.
  • kafka-topics.sh does not have changes to its API but will have new fields in the output for the ELR, LastKnownELR, and LastKnownLeader.

ACL: Describe Topic

Limit: 20 topics max per request

More admin client related details please refer to the Admin API/Client changes

ACL: Describe Topic

The caller can list the topics interested or keep the field empty if requests all of the topics.

Pagination.

This is a new behavior introduced. The caller can specify the maximum number of partitions to be included in the response.

If there are more partitions than the limit, these partitions and their topics will not be sent back. In this case, the Cursor field will be populated. The caller can include this cursor in the next request. 

Note,

  • There is also a server-side config to control the maximum number of partitions to return. max.request.partition.size.limit
  • There is no consistency guarantee between requests.
  • It is an admin client facing API, so there is no topic id supported.
{
  "apiKey": 74,
  {   "apiKey":69,   "type": "request",   
  "listeners": ["broker"],   
  "name": "DescribeTopicRequestDescribeTopicPartitionsRequest",   
  "validVersions": "0",   
  "flexibleVersions": "0+",   
  "fields": [     
    { "name": "Topics", "type": "[]stringTopicRequest", "versions": "0+",       
      "about": "The topics to fetch details for.",
      "fields": [
        { "versionsname": "0+Name", "entityTypetype": "topicName"} ] }

DescribeTopicResponse

{
  "apiKey":69,
  "type": "request",
  "name": "DescribeTopicResponse",
  "validVersionsstring", "versions": "0+",
          "about": "The topic name", "versions": "0+",    "flexibleVersionsentityType": "0+",   "fields": [     topicName"}
      ]
    },
    { "name": "TopicsResponsePartitionLimit", "type": "[]MetadataResponseTopicint32", "versions": "0+",        "aboutdefault": "2000"Each,
  topic  in  the response.", "fields": [       "about": "The maximum number of partitions included in the response." },
    { "name": "ErrorCodeCursor", "type": "int16Cursor", "versions": "0+", "nullableVersions": "0+",          "default": "null",
      "about": "The first topic error,and orpartition 0index ifto therefetch was no error." },       details for.", "fields": [
      { "name": "NameTopicName", "type": "string", "versions": "0+", "mapKey": true, "entityType
        "about": "topicNameThe name for the first topic to process", "nullableVersionsversions": "0+",          "aboutentityType": "The topic name." },       "topicName"},
      { "name": "TopicIdPartitionIndex", "type": "uuidint32", "versions": "0+", "ignorable": true, "about": "The partition index to start with"}
    ]}
  ]
}

DescribeTopicsResponse

{
  "apiKey": 74,
  topic id." },       { "name": "IsInternal", "type": "boolresponse",
  "versionsname": "0+DescribeTopicPartitionsResponse",
  "defaultvalidVersions": "false0",
  "ignorableflexibleVersions": true,         "about": "True if the topic is internal." }, "0+",
  "fields": [
    { "name": "PartitionsThrottleTimeMs", "type": "[]MetadataResponsePartitionint32", "versions": "0+", "ignorable",         : true,
      "about": "EachThe partitionduration in milliseconds for which the topic.", "fields": [         { "name": "ErrorCode", "type": "int16", "versions": "0+",           "about": "The partition error, or 0 if there was no error." },         request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "PartitionIndexTopics", "type": "int32[]DescribeTopicPartitionsResponseTopic", "versions": "0+",           
      "about": "The partition index." },         Each topic in the response.", "fields": [
      { "name": "LeaderIdErrorCode", "type": "int32int16", "versions": "0+", "entityType": "brokerId",           
        "about": "The ID of the leader brokertopic error, or 0 if there was no error." },         
      { "name": "LeaderEpochName", "type": "int32string", "versions": "0+", "defaultmapKey": true, "entityType": "-1topicName", "ignorablenullableVersions": true,           "0+",
        "about": "The leader epoch of this partitiontopic name." },         
      { "name": "ReplicaNodesTopicId", "type": "[]int32uuid", "versions": "0+", "entityTypeignorable": "brokerId",           true, "about": "The set of all nodes that host this partitiontopic id." },         
      { "name": "IsrNodesIsInternal", "type": "[]int32bool", "versions": "0+", "entityTypedefault": "brokerIdfalse",            "aboutignorable": "The set of nodes that are in sync with the leader for this partitiontrue,
        "about": "True if the topic is internal." },
      { "name": "EligibleLeaderReplicasPartitions", "type": "[]int32DescribeTopicPartitionsResponsePartition", "defaultversions": "null0+", "entityType": "brokerId",          "versions
        "about": "0+", "nullableVersions": "0+",          "about": "The new eligible leader replicas otherwise." }, Each partition in the topic.", "fields": [
        { "name": "LastKnownELRErrorCode", "type": "[]int32int16", "defaultversions": "null0+", "entityType": "brokerId", "versions
          "about": "0+", "nullableVersions": "0+", "about": "The last known ELRThe partition error, or 0 if there was no error." },
        { "name": "LastKnownLeaderPartitionIndex", "type": "int32", "defaultversions": "null", "entityType": "brokerId", "versions": "0+", "nullableVersions": "0+",
          "about": "The lastpartition known leaderindex." },
         { "name": "OfflineReplicasLeaderId", "type": "[]int32", "versions": "0+", "ignorable": true, "entityType": "brokerId",           
          "about": "The setID of offlinethe replicas of this partitionleader broker." }       ]},
        { "name": "
TopicAuthorizedOperationsLeaderEpoch", "type": "int32", "versions": "0+", "default": "-21474836481",
"ignorable": true,
         
"about": "32-bitThe bitfieldleader toepoch represent authorized operations for of this topicpartition." } ] }

CleanShutdownFile (Coming with ELR)

,
        { "BrokerEpoch":"xxx"}

ElectLeadersRequest (Coming with Unclean Recovery)

ACL: CLUSTER_ACTION

Limit: 20 topics per request.

{
  "apiKey": 43,
 name": "ReplicaNodes", "type": "request[]int32",
  "listenersversions": ["zkBroker0+", "brokerentityType",: "controllerbrokerId"],

    "name": "ElectLeadersRequest", "validVersions      "about": "0-3", "flexibleVersions": "2+", "fields": [ ... The set of all nodes that host this partition." },
        { "name": "TopicPartitionsIsrNodes", "type": "[]TopicPartitionsint32", "versions": "0+", "nullableVersionsentityType": "0+brokerId",
          "about": "The topicset partitionsof tonodes elect leaders.", "fields": [ ... // New fields begin. that are in sync with the leader for this partition." },
        { "name": "DesiredLeadersEligibleLeaderReplicas", "type": "[]int32", "versionsdefault": "3+null", "entityType": "brokerId",
          "aboutversions": "0+"The desired leaders. The entry should matches with the entry in Partitions by the index, "nullableVersions": "0+",
          "about": "The new eligible leader replicas otherwise." },
    }, // New fields end. ] }, { "name    { "name": "LastKnownELR", "type": "[]int32", "default": "TimeoutMsnull", "typeentityType": "int32brokerId",
          "versions": "0+", "defaultnullableVersions": "600000+",
          "about": "The timelast in ms to wait for the election to completeknown ELR." } ,
      ] }

GetReplicaLogInfo Request (Coming with Unclean Recovery)

ACL: CLUSTER_ACTION

Limit: 2000 partitions per request.

{
  "apiKey":70,
    { "name": "OfflineReplicas", "type": "request[]int32",
  "listenersversions": ["broker0+"],
  "nameignorable": "GetReplicaLogInfoRequest"true,
  "validVersionsentityType": "0brokerId",
 
          "flexibleVersionsabout": "0+", "fields": [ The set of offline replicas of this partition." }]},
      { "name": "BrokerIdTopicAuthorizedOperations", "type": "int32", "versions": "0+", "entityTypedefault": "brokerId-2147483648",
        "about": "The32-bit IDbitfield ofto therepresent broker." }, authorized operations for this topic." }]
    },
    { "name": "TopicPartitionsNextTopicPartition", "type": "[]TopicPartitionsCursor", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The next topic and partition partitionsindex to fetch electdetails leadersfor.", "fields": [
      { "name": "TopicIdTopicName", "type": "uuidstring", "versions": "0+", "ignorable": true,
        "about": "The uniquename topic ID"}, { "namefor the first topic to process", "versions": "Partitions0+", "typeentityType": "[]int32topicName"}, "versions": "0+",
      { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partitionspartition ofindex thisto topic whose leader should be elected." }, ]} ] }

GetReplicaLogInfo Response

start with"}
    ]}
  ]
}

CleanShutdownFile (Coming with ELR)

It will be a JSON file.

{ 
"version": 0
"BrokerEpoch":"xxx"
}

ElectLeadersRequest (Coming with Unclean Recovery)

ACL: CLUSTER_ACTION

Limit: 1000 partitions per request. If more than 1000 partitions are included, only the first 1000 will be served. Others will be returned with REQUEST_LIMIT_REACHED.

{
  "apiKey": 43,
  "type": "request{
  "apiKey":70,
  "type": "response",
  "name": "GetReplicaLogInfoResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fieldslisteners": [
    {"zkBroker", "broker", "controller"],
  "name": "BrokerEpochElectLeadersRequest",
  "typevalidVersions": "int640-3",
  "versionsflexibleVersions": "02+",
  "aboutfields": "The[
 epoch for...
 the broker." }
    {  { "name": "LogInfoListTopicPartitions", "type": "[]LogInfoTopicPartitions", "versions": "0+", "nullableVersions": "0+",
    "about": "The listtopic ofpartitions theto logelect infoleaders.",
    "fields": [
    ...

// New fields begin. The same level with the Partitions
     { "name": "TopicIdDesiredLeaders", "type": "uuid[]int32", "versions": "03+", "ignorablenullableVersions": true,"3+",
      "about": "The uniquedesired topic ID."}, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The id for the partition." }, leaders. The entry should match with the entry in Partitions by the index." }, }, // New fields end. ] }, { "name": "LastWrittenLeaderEpochTimeoutMs", "type": "int32", "versions": "0+", "aboutdefault": "The60000", last written leader epoch in the log." }, { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "about": "The current leader epoch for the partition from the broker point of view." }, "about": "The time in ms to wait for the election to complete." } ] }

GetReplicaLogInfo Request (Coming with Unclean Recovery)

ACL: CLUSTER_ACTION

Limit: 2000 partitions per request. If more than 1000 partitions are included, only the first 1000 will be served. Others will be returned with REQUEST_LIMIT_REACHED.

{
  "apiKey":70,
  "type": "request",
  "listeners": ["broker"],
    { "name": "LogEndOffsetGetReplicaLogInfoRequest",
  "typevalidVersions": "int640",
  "versionsflexibleVersions": "0+",
  "aboutfields": "The[
 log end offset for the partition." }
    ]}
] }

kafka-leader-election.sh (Coming with Unclean Recovery)

...
// Updated field starts.
--election-type <[PREFERRED, UNCLEAN, LONGEST_LOG_PROACTIVE, LONGEST_LOG_BALANCED, DESIGNATION]:  { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", 
        "about": "The ID of the broker." },
    { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
    "about": "The topic partitions to query the log info.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions":  Type of election to attempt. Possible
  election type>         "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of this topic whose leader should values are "preferred" for preferredbe elected." },
    ]}
] }

GetReplicaLogInfo Response

{
  "apiKey":70,
  "type": "response",
  "name": "GetReplicaLogInfoResponse",
    "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "about": "The epoch for the broker." }
  leader election, or{ "uncleanname" for
     : "TopicPartitionLogInfoList", "type": "[]TopicPartitionLogInfo", "versions": "0+", 
    "about": "The list of the log info.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
a random unclean leader election, { "name": "PartitionLogInfo", "type": "[]PartitionLogInfo", "versions": "0+", "about": "The log info of a partition.",
"fields": [
     { or "longest_log_proactive"/"longest_log_balanced" "name": "Partition", "type": "int32", "versions": "0+", "about": "The id for the partition." }, { "name": "LastWrittenLeaderEpoch", "type": "int32", "versions": "0+", "about": "The last written leader epoch in the log." }, { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", to choose the replica "about": "The current leader epoch for the partition from the broker point of view." }, { "name": "LogEndOffset", "type": "int64", "versions": "0+", "about": "The log end offset for the partition." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", with the longest log or "designation" for "about": "The result error, or zero if there was no error."},
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The result message, or null if there was no error."} ]}, ]} ] }


kafka-leader-election.sh (Coming with Unclean Recovery)

...
// Updated field starts.
--election-type <[PREFERRED, UNCLEAN, LONGEST_LOG_AGGRESSIVE, LONGEST_LOG_BALANCED, DESIGNATION]:   electing the given replica to be the leader. If
                  
                        preferred election is selection, the
              Type of election to attempt. Possible
  election type>                    election is only performed if the
 values are 
current leader is not"preferred" thefor preferred leader election
or leader"unclean" for a therandom topicunclean partition.leader Ifelection, longestor "longest_log_proactiveagressive"/"longest_log_balanced/designation " to choose the replica
election is selected,with the longest log
election is only performedor if there "designation" for electing the given replica("desiredLeader") to be the leader.
are no leader for the topic If preferred election is selection, the partition. REQUIRED. election is only performed if the --path-to-json-file <String: Path to The JSON file with the list of JSON file>current leader is not the preferred partition for which leader elections leader for the topic partition. If should be performed. This is an longest_log_agressive/longest_log_balanced/designation example format. The desiredLeader field election is selected, the is only required in DESIGNATION election. election is only performed if there are no leader for the topic {"partitions": partition. REQUIRED. [{"topic": "foo", "partition": 1, "desiredLeader": 0}, --path-to-json-file <String: Path to The JSON file with the list of JSON file> {"topic": "foobar", "partition": 2, "desiredLeader": 1}] partition for which leader elections } should be performed. This is an Not allowed if --all-topic-partitions example format. The desiredLeader field or --topic flags are specified. // Updated field ends.
is only required in DESIGNATION election. {"partitions": [{"topic": "foo", "partition": 1, "desiredLeader": 0}, {"topic": "foobar", "partition": 2, "desiredLeader": 1}] } Not allowed if --all-topic-partitions or --topic flags are specified. // Updated field ends.

Config changes

The new configs are introduced for ELR

  1. eligible.leader.replicas.enabled. It controls whether the controller will record the ELR-related metadata and whether ISR can be empty. False is the default value. It will turn true in the future.
  2. max.request.partition.size.limit. The maximum number of partitions to return in a API response.

The new configs are introduced for Unclean Recovery.

  1. unclean.recovery.strategy. Described in the above section. Balanced is the default value. 
  2. unclean.recovery.manager.enabled. True for using the unclean recovery manager to perform an unclean recovery. False means the random election will be used in the unclean leader election. False is the default value.
  3. unclean.recovery.timeout.ms. The time limits of waiting for the replicas' response during the Unclean Recovery. 5 min is the default value.

New Errors

REQUEST_LIMIT_REACHED

As we introduce the request limit for the new APIs, the items sent in the request but over the limit will be returned with REQUEST_LIMIT_REACHED. It is a retriable error.

Admin API/Client changes

The admin client will start to use the DescribeTopicsRequest to describe the topic.

  1. The client will split a large request into proper pieces and send them one after another if the requested topics count reaches the limit.
  2. The client will retry querying the topics if they received the response with Cursor field. 
  3. The output of the topic describe will be updated with the ELR related fields.
  4. TopicPartitionInfo will be updated to include the ELR related fields.

Metrics

The following metrics will be added for ELR

...

  • min.insync.replicas will no longer be effective to be larger than the replication factor. For existing configs, the min.insync.replicas will be min(min.insync.replicas, replication factor).

  • Cluster admin should update the min.insync.replicas to 1if they want to have the replication going when there is only the leader in the ISR.

  • Note that, this new requirement is not guarded by any feature flags/Metadata version.

ELR

It will be guarded by a new metadata version and the eligible.leader.replicas.enabled. So it is not enabled during the rolling upgrade.

After the controller picked up the new MV and eligible.leader.replicas.enabled is true,  when it loads the partition states, it will populate the ELR as empty if the PartitionChangeRecord uses an old version. In the next partition update, the controller will record the current ELR. version. In the next partition update, the controller will record the current ELR. 

MV downgrade: Once the MV version is downgraded, all the ELR related fields will be removed on the next partition change. The controller will also ignore the ELR fields.

Software downgrade: After the MV version is downgraded, a metadata delta will be generated to capture the current status. Then the user can start the software downgradeNote, the leader can only enable empty ISR after the new metadata version.

Clean shutdown

It will be guarded by a new metadata version. Before that, the controller will treat all the registrations with unclean shutdowns.

...

Unclean Recovery is guarded by the feature flag unclean.recovery.manager.enabled

  • For the existing unclean.leader.election.enable
    1. If true, unclean.recovery.strategy will be set to Aggressive.

    2. If false, unclean.recovery.strategy will be set to Balanced.

  • unclean.leader.election.enable will be marked as deprecated.

Delivery plan

The KIP is a large plan, it can be across multiple quarters. So we have to consider how to deliver the project in phases.

...

  • No extra metadata is required.

  • The leader will send fewer ISR updates. Generally, if removing a replica will results the ISR below min ISR, which means the HWM can't advance, the leader will avoid updating the ISR.

  • In a special case of the follower is network partitioning only from the controller, the replication will not be interrupted because fencing the replica does not result in ISR below minISR.

Cons

  • The ISR now can have fenced replicas and lagging replicas. This makes a gap in the current operations.

    • Admin API needs to be adjusted. DescribeTopic API has to be sent to the leader to query the "clean" ISR(without lagging and fenced replica).

    • The operation on the topics like demote brokers requires "clean" ISR info to make sure the operation is safe.

    • The observability metrics need to be updated to reflect the "clean" ISR.

    • It may be confusing to the public that the proposal changes a well-known ISR concept.

Allow ISR shrink to empty + log-inspection-based unclean leader election

The idea is to allow ISR to shrink to size 0 if all the replicas are unavailable. This can ensure the last broker will not be automatically elected as leader after it comes back so that no silent data loss can happen. Then if we really lose all the replicas, we enforce an unclean leader election.

The second part of the design is the unclean leader election. Instead of choosing randomly from the replicas, the controller can query all the available brokers for their current log offset. Then the controller can choose the one with the longest log.

Pros

  • It can save more ack=1 messages.

  • It does not change any current invariants for the clients.

Cons

  • We can't definitely claim the choice does not have data loss.

  • The leader election only makes sense if enough replicas are unfenced. This is very likely to affect the overall service availability.

Current ISR + Sync phase + stability + shadow ISR

This proposal allows the last replica to become the leader even if it has an unclean shutdown. After it becomes the leader, it goes through a sync phase with a set of eligible followers for any lost data.

An eligible follower is defined by

...

It has not had an unclean shutdown.

  • interrupted because fencing the replica does not result in ISR below minISR.

Cons

  • The ISR now can have fenced replicas and lagging replicas. This makes a gap in the current operations.

    • Admin API needs to be adjusted. DescribeTopic API has to be sent to the leader to query the "clean" ISR(without lagging and fenced replica).

    • The operation on the topics like demote brokers requires "clean" ISR info to make sure the operation is safe.

    • The observability metrics need to be updated to reflect the "clean" ISR.

    • It may be confusing to the public that the proposal changes a well-known ISR concept.

Allow ISR shrink to empty + log-inspection-based unclean leader election

The idea is to allow ISR to shrink to size 0 if all the replicas are unavailable. This can ensure the last broker will not be automatically elected as leader after it comes back so that no silent data loss can happen. Then if we really lose all the replicas, we enforce an unclean leader election.

The second part of the design is the unclean leader election. Instead of choosing randomly from the replicas, the controller can query all the available brokers for their current log offset. Then the controller can choose the one with the longest log

...

.

Pros

  • It can save more ack=1 messages.

  • It does not change any current invariants for the clients.

Cons

  • It We can claim no data loss for ack=all messages if there is an eligible follower to sync with.

Cons

  • It may affect the service availability if we have to wait for another shadow ISR member to get back online.

  • Implementation can be complicated.

  • HWM can move backward.

Rejected proposal for the unclean recovery

Enhance the ListOffsets API

It is an option to enhance the ListOffsets API instead of creating a new API. The intention of the new API is to distinguish the admin API and the client API. To be specific, It is necessary to include the broker epoch in the recovery, but this piece of information does not necessarily to be exposed to the clients.

The controller elects the leader when a given number of replicas have replied

In Balance mode, instead of waiting for the last-known ELR members, the controller elects when receives a given number of responses. For example, if the replication factor is 3 and min-ISR is 2. If we require at least 2 responses, then controller will choose between the first 2 replicas.

However, it does not give enough protection if we don’t require responses from all the replicas. Consider the following case. The ISR starts with [0,1,2]. Broker 2 falls behind and is kicked out of ISR. Then broker 1, and broker 0 suffers unclean shutdowns and broker 1 had a real data loss. Later, broker 2 and broker 1 come online and controller will choose between 1 and 2. Either option will have data loss.

Actually in this model, broker 2 is not likely to have the complete log, so just forcing a fixed number of responses does not improve much durability.

Using a different set of configs

We also considered deprecating the unclean.leader.election.enable and using unclean.recovery.strategy(Manual/Balanced/Proactive). It would require the config conversion when we enable using the unclean recovery manager.

...

  • 't definitely claim the choice does not have data loss.

  • The leader election only makes sense if enough replicas are unfenced. This is very likely to affect the overall service availability.

Current ISR + Sync phase + stability + shadow ISR

This proposal allows the last replica to become the leader even if it has an unclean shutdown. After it becomes the leader, it goes through a sync phase with a set of eligible followers for any lost data.

An eligible follower is defined by

  1. It has not had an unclean shutdown.

  2. It is a member of the shadow ISR. Shadow ISR is a superset of the ISR which follows the strict min ISR rule that its size will never drop below min ISR.

Pros

  • It can save more ack=1 messages.

  • It does not change any current invariants for the clients.

  • It can claim no data loss for ack=all messages if there is an eligible follower to sync with.

Cons

  • It may affect the service availability if we have to wait for another shadow ISR member to get back online.

  • Implementation can be complicated.

  • HWM can move backward.

Rejected proposal for the unclean recovery

Enhance the ListOffsets API

It is an option to enhance the ListOffsets API instead of creating a new API. The intention of the new API is to distinguish the admin API and the client API. To be specific, It is necessary to include the broker epoch in the recovery, but this piece of information does not necessarily to be exposed to the clients.

The controller elects the leader when a given number of replicas have replied

In Balance mode, instead of waiting for the last-known ELR members, the controller elects when receives a given number of responses. For example, if the replication factor is 3 and min-ISR is 2. If we require at least 2 responses, then controller will choose between the first 2 replicas.

However, it does not give enough protection if we don’t require responses from all the replicas. Consider the following case. The ISR starts with [0,1,2]. Broker 2 falls behind and is kicked out of ISR. Then broker 1, and broker 0 suffers unclean shutdowns and broker 1 had a real data loss. Later, broker 2 and broker 1 come online and controller will choose between 1 and 2. Either option will have data loss.

Actually in this model, broker 2 is not likely to have the complete log, so just forcing a fixed number of responses does not improve much durability

...

.