Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Vote pending Accepted

Discussion thread: here

Vote thread: here

...

  1. Change of the ELR does not require a leader epoch bump. In most cases, the ELR updates along with the ISR changes. The only case of the ELR changes alone is when an ELR broker registers after an unclean shutdown. In this case, no need to bump the leader epoch.

  2. When updating the config min.insync.replicas, if the new min ISR <= current ISR, the ELR will be removed.

  3. A new metric of Electable leaders will be added. It reflects the count of (ISR + ELR).

  4. The AlterPartitionReassignments. The leader updates the ISR implicitly with AlterPartition requests. The controller will make sure than upon completion, the ELR only contains replicas in the final replica set. Additionally, in order to improve the durability of the reassignment

    1. The current behavior, when completing the reassignment, all the adding replicas should be in ISR. This behavior can result in 1 replica in ISR. Also, ELR may not help here because the removing ISR replicas can not stay in ELR when completed. So we propose to enforce that the reassignment can only be completed if the ISR size is larger or equal to min ISR.
    2. This min ISR requirement is also enforced when the reassignment is canceled.
  5. Have a new admin API  DescribeTopicRequest DescribeTopicsRequest for showing the topic details. We don't want to embed the ELR info in the Metadata API. The ELR is not some necessary details to be exposed to user clients.

    1. More public facing details will be discussed in the DescribeTopicRequest  DescribeTopicsRequest section.
  6. We also record the last-known ELR members.

    1. It basically means when an ELR member has an unclean shutdown, it will be removed from ELR and added to the LastKnownELR. The LastKnownELR will be cleaned when ISR reaches the min ISR.

    2. LastKnownELR is stored in the metadata log.

    3. LastKnownELR will be also useful in the Unclean Recovery section.

  7. The last known leader will be tracked.
    1. This can be used if the Unclean recovery is not enabled. More details will be discussed in the Deliver Plan.
    2. The controller will record the last ISR member(the leader) when it is fenced.
    3. It will be cleaned when a new leader is elected.

...

  1. During the shutdown, write the current broker epoch in the CleanShutdownFile.

  2. During the start, the broker will try to read the broker epoch from the CleanShutdownFile. Then put this broker epoch in the broker registration request.

  3. The controller will verify the broker epoch in the request with its registration record. If it is the same, it is a clean shutdown.

  4. if the broker shuts down before it receives the broker epoch, it will write -1.

Note, the CleanShutdownFile is removed after the log manager is initialized. It will be created and written when the log manager is shutting down.

Unclean recovery

As the new proposal allows the ISR to be empty, the leader election strategy has to be reviewed.

...

{
  "apiKey":62,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerRegistrationRequest",
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
...
// New fields begin.
    { "name": "PreviousBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
      "about": "The epoch before a clean shutdown." }
// New fields end.
  ]
}

...

DescribeTopicPartitionsRequest (Coming with ELR)

Should be issued by admin clients. The broker will serve this request. As a part of the change, the admin client will start to use DescribeTopicRequest

to query the topic, instead of using the metadata requests.

On the other hand, The following changes may affect the client side.

  • The TopicPartitionInfo will also updated to include the ELR info.
  • kafka-topics.sh does not have changes to its API but will have new fields in the output for the ELR, LastKnownELR, and LastKnownLeader.

ACL: Describe Topic

Limit: 20 topics max per request. If more than 20 topics are included, only the first 20 will be returned with info. Others will be returned with InvalidRequestError.

More admin client related details please refer to the Admin API/Client changes

ACL: Describe Topic

The caller can list the topics interested or keep the field empty if requests all of the topics.

Pagination.

This is a new behavior introduced. The caller can specify the maximum number of partitions to be included in the response.

If there are more partitions than the limit, these partitions and their topics will not be sent back. In this case, the Cursor field will be populated. The caller can include this cursor in the next request. 

Note,

  • There is also a server-side config to control the maximum number of partitions to return. max.request.partition.size.limit
  • There is no consistency guarantee between requests.
  • It is an admin client facing API, so there is no topic id supported.
{
  "apiKey": 74,
  {   "apiKey":69,   "type": "request",   
  "listeners": ["broker"],   
  "name": "DescribeTopicRequestDescribeTopicPartitionsRequest",   
  "validVersions": "0",   
  "flexibleVersions": "0+",   
  "fields": [     {
    { "name": "Topics", "type": "[]stringTopicRequest", "versions": "0+",       
      "about": "The topics to fetch details for.",
      "versionsfields": "0+", "entityType[
        { "name": "topicName"} ] }

DescribeTopicResponse

{
  "apiKey":69,
  Name", "type": "requeststring",
   "nameversions": "DescribeTopicResponse0+",
  "validVersions
          "about": "0The topic name",    "flexibleVersionsversions": "0+",   "fields": [     { "nameentityType": "Topics", "type": "[]MetadataResponseTopic", "versions": "0+",       "about": "Each topic in the response.", "fields": [       topicName"}
      ]
    },
    { "name": "ErrorCodeResponsePartitionLimit", "type": "int16int32", "versions": "0+",          "default": "2000",
      "about": "The topicmaximum error,number orof 0partitions ifincluded therein wasthe no errorresponse." },       
    { "name": "NameCursor", "type": "stringCursor", "versions": "0+", "mapKeynullableVersions": true"0+", "entityTypedefault": "topicNamenull",
      "nullableVersionsabout": "0+",         "about": "The topic name." },       The first topic and partition index to fetch details for.", "fields": [
      { "name": "TopicIdTopicName", "type": "uuidstring", "versions": "0+", "ignorable": true,
        "about": "The name for the first topic to id.process" },       { "nameversions": "IsInternal0+", "typeentityType": "booltopicName"},
      { "versionsname": "0+PartitionIndex", "defaulttype": "falseint32", "ignorableversions": true,         "0+", "about": "TrueThe ifpartition theindex topicto isstart internal.with" },
    ]}
  ]
}

DescribeTopicsResponse

{
  "apiKey": 74,
  { "nametype": "Partitionsresponse",
  "typename": "[]MetadataResponsePartitionDescribeTopicPartitionsResponse",
  "versionsvalidVersions": "0+",         "about
  "flexibleVersions": "Each partition in the topic.0+",
  "fields": [         
    { "name": "ErrorCodeThrottleTimeMs", "type": "int16int32", "versions": "0+",            "aboutignorable": "The partition errortrue,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or 0zero if there was no error the request did not violate any quota." },         
    { "name": "PartitionIndexTopics", "type": "int32[]DescribeTopicPartitionsResponseTopic", "versions": "0+",           
      "about": "The partition index." },         Each topic in the response.", "fields": [
      { "name": "LeaderIdErrorCode", "type": "int32int16", "versions": "0+", "entityType": "brokerId",           "
        "about": "The ID of the leader brokertopic error, or 0 if there was no error." },         
      { "name": "LeaderEpochName", "type": "int32string", "versions": "0+", "mapKey": true, "defaultentityType": "-1topicName", "ignorablenullableVersions": true,           "0+",
        "about": "The leader epoch of this partitiontopic name." },         
      { "name": "ReplicaNodesTopicId", "type": "[]int32uuid", "versions": "0+", "entityTypeignorable": "brokerId",           true, "about": "The set of all nodes that host this partitiontopic id." },         
      { "name": "IsrNodesIsInternal", "type": "[]int32bool", "versions": "0+", "entityTypedefault": "brokerIdfalse",            "aboutignorable": "The set of nodes that are in sync with the leader for this partitiontrue,
        "about": "True if the topic is internal." },
      { "name": "EligibleLeaderReplicasPartitions", "type": "[]int32DescribeTopicPartitionsResponsePartition", "defaultversions": "null0+", "entityType": "brokerId",          "versions
        "about": "0+", "nullableVersions": "0+",          "about": "The new eligible leader replicas otherwise." }, Each partition in the topic.", "fields": [
        { "name": "LastKnownELRErrorCode", "type": "[]int32int16", "defaultversions": "null0+",
          "entityTypeabout": "brokerId", "versionsThe partition error, or 0 if there was no error." },
        { "name": "0+PartitionIndex", "nullableVersionstype": "0+int32", "versions": "0+",
          "about": "The lastpartition known ELRindex." },
        { "name": "LastKnownLeaderLeaderId", "type": "int32", "default": "null", "entityType": "brokerId", "versions": "0+", "nullableVersionsentityType": "0+brokerId",
          "about": "The ID lastof knownthe leader broker." },
         { "name": "OfflineReplicasLeaderEpoch", "type": "[]int32", "versions": "0+", "ignorabledefault": true"-1", "entityTypeignorable": "brokerId",           true,
          "about": "The set of offline replicasleader epoch of this partition." },
        { "name": "TopicAuthorizedOperationsReplicaNodes", "type": "[]int32", "versions": "0+", "defaultentityType": "-2147483648brokerId",
          "about": "32-bitThe bitfieldset toof representall authorizednodes operationsthat forhost this topicpartition." }]},
] }

CleanShutdownFile (Coming with ELR)

It will be a JSON file.

{ 
"version": 0
"BrokerEpoch":"xxx"
}

ElectLeadersRequest (Coming with Unclean Recovery)

ACL: CLUSTER_ACTION

Limit: 20 topics per request. If more than 20 topics are included, only the first 20 will be served. Others will be returned with InvalidRequestError.


        { "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
          "about": "The set of nodes that are in sync with the leader for this partition." },
        { "name": "EligibleLeaderReplicas", { "apiKey": 43, "type": "request[]int32", "listenersdefault": ["zkBrokernull", "brokerentityType",: "controllerbrokerId"], ,
          "nameversions": "ElectLeadersRequest0+", "validVersionsnullableVersions": "0-3+",
          "flexibleVersionsabout": "2+", "fields": [ ... The new eligible leader replicas otherwise." },
        { "name": "TopicPartitionsLastKnownELR", "type": "[]TopicPartitionsint32", "versionsdefault": "0+null", "nullableVersionsentityType": "0+brokerId",
          "aboutversions": "The topic partitions to elect leaders.0+", "fields"nullableVersions": "0+",
          "about": [ "The last known ... // New fields begin. ELR." },
        { "name": "DesiredLeadersOfflineReplicas", "type": "[]int32", "versions": "30+", "ignorable": true, "nullableVersionsentityType": "3+brokerId",
          "about": "The desiredset leaders.of Theoffline entryreplicas shouldof matchthis with the entry in Partitions by the index." }, }, // New fields end. ] }, partition." }]},
      { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
        "about": "32-bit bitfield to represent authorized operations for this topic." }]
    },
    { "name": "TimeoutMsNextTopicPartition", "type": "int32Cursor", "versions": "0+", "nullableVersions": "0+", "default": "60000null",
      "about": "The next topic timeand inpartition msindex to waitfetch details for the election to complete." } ] }

GetReplicaLogInfo Request (Coming with Unclean Recovery)

ACL: CLUSTER_ACTION

Limit: 2000 partitions per request. If more than 2000 partitions are included, only the first 2000 will be served. Others will be returned with InvalidRequestError.

{
  "apiKey":70,
  "type": "request",
  "listeners": ["broker"],
  "name": "GetReplicaLogInfoRequest",
  "validVersions": "0",
  "flexibleVersions, "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The name for the first topic to process", "versions": "0+", "fieldsentityType": [ "topicName"},
      { "name": "BrokerIdPartitionIndex", "type": "int32", "versions": "0+", "entityTypeabout": "brokerId",The partition index to start "about": "The ID of the broker." }, { "name": "TopicPartitions", "type": "[]TopicPartitionswith"}
    ]}
  ]
}

CleanShutdownFile (Coming with ELR)

It will be a JSON file.

{ 
"version": 0
"BrokerEpoch":"xxx"
}

ElectLeadersRequest (Coming with Unclean Recovery)

ACL: CLUSTER_ACTION

Limit: 1000 partitions per request. If more than 1000 partitions are included, only the first 1000 will be served. Others will be returned with REQUEST_LIMIT_REACHED.

{
  "apiKey": 43,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  ", "versions": "0+", "nullableVersions": "0+",
    "about": "The topic partitions to query the log info.",
    "fields": [
      { "name": "TopicIdElectLeadersRequest",
  "typevalidVersions": "uuid0-3",
  "versionsflexibleVersions": "02+",
 "ignorable": true, "aboutfields": "The[
 unique topic ID"},
       ...
  { "name": "PartitionsTopicPartitions", "type": "[]int32TopicPartitions", "versions": "0+",
    "nullableVersions": "0+",
    "about": "The partitions of this topic whosepartitions leaderto shouldelect be electedleaders." },
    ]}
] }

GetReplicaLogInfo Response

{
  "apiKey":70,"fields": [
  "type": "response",
  "name": "GetReplicaLogInfoResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
  ...

// New fields begin. The same level with the Partitions
     { "name": "BrokerEpochDesiredLeaders", "type": "int64[]int32", "versions": "3+", "nullableVersions": "03+",
      "about": "The
epoch fordesired leaders. The entry should match with the entry in Partitions by the brokerindex." }, { "name": "TopicPartitionLogInfoList", "type": "[]TopicPartitionLogInfo", "versions": "0+", "about": "The list of the log info.", "fields": [ }, // New fields end. ] }, { "name": "TopicIdTimeoutMs", "type": "uuidint32", "versions": "0+", "ignorabledefault": true"60000", "about": "The uniquetime in ms to wait for the election to complete." } ] }

GetReplicaLogInfo Request (Coming with Unclean Recovery)

ACL: CLUSTER_ACTION

Limit: 2000 partitions per request. If more than 1000 partitions are included, only the first 1000 will be served. Others will be returned with REQUEST_LIMIT_REACHED.

{
  "apiKey":70,
  "type": "request",
  "listeners": ["broker"],
 topic ID."},
{ "name": "PartitionLogInfo", "type": "[]PartitionLogInfo", "versions": "0+", "about": "The log info of a partition.",
"fields": [
     { "name": "PartitionGetReplicaLogInfoRequest", "typevalidVersions": "int320", "versionsflexibleVersions": "0+", "aboutfields": "The id for the partition." },[ { "name": "LastWrittenLeaderEpochBrokerId", "type": "int32", "versions": "0+", "aboutentityType": "The last written leader epoch inbrokerId", "about": "The ID of the logbroker." }, { "name": "CurrentLeaderEpochTopicPartitions", "type": "int32[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "about": "The currenttopic leaderpartitions epochto forquery the partition from the broker point of view." },log info.", "fields": [ { "name": "LogEndOffsetTopicId", "type": "int64uuid", "versions": "0+", "aboutignorable": "The log end offset for the partition." },
   true, "about": "The unique topic ID"}, { "name": "ErrorCodePartitions", "type": "int16[]int32", "versions": "0+", "about": "The resultpartitions error,of orthis zerotopic ifwhose thereleader wasshould nobe errorelected." },
]} ] }

GetReplicaLogInfo Response

{
  "apiKey":70,
  { "nametype": "ErrorMessageresponse",
  "typename": "stringGetReplicaLogInfoResponse",
  "versionsvalidVersions": "0+",
  "nullableVersionsflexibleVersions": "0+",
  "aboutfields": "The[
 result message, or null if there was no error."}      ]},{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "about": "The epoch for the broker." }
    ]}
] }

kafka-leader-election.sh (Coming with Unclean Recovery)

...
// Updated field starts.
--election-type <[PREFERRED, UNCLEAN, LONGEST_LOG_AGGRESSIVE, LONGEST_LOG_BALANCED, DESIGNATION]:                { "name": "TopicPartitionLogInfoList", "type": "[]TopicPartitionLogInfo", "versions": "0+", 
    "about": "The list of the log info.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
{ "name": "PartitionLogInfo", "type": "[]PartitionLogInfo", "versions": "0+", "about": "The log info of a partition.",
Type of election to attempt. Possible election type> "fields": [
     { "name": "Partition", "type": "int32", "versions": "0+", "about": "The id for the partition." }, { "name": "LastWrittenLeaderEpoch", "type": "int32", values are
"versions": "0+", "about": "The last written leader epoch in the log." }, { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "about": "The current leader epoch for the partition from the broker point of view." }, { "preferredname": for preferred leader election
"LogEndOffset", "type": "int64", "versions": "0+", "about": "The log end offset for the partition." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The result error, or zero if there was no error."},
{ "name": "ErrorMessage", "type": or"string", "versions": "unclean0+", for a random unclean leader election, "nullableVersions": "0+", "about": "The result message, or null if there was no error."} ]}, ]} ] }


kafka-leader-election.sh (Coming with Unclean Recovery)

...
// Updated field starts.
--election-type <[PREFERRED, UNCLEAN, LONGEST_LOG_AGGRESSIVE, LONGEST_LOG_BALANCED, DESIGNATION]:                         or "longest_log_agressive"/"longest_log_balanced" to choose the
 replica 
Type of election withto theattempt. longestPossible log
election type> values are
or "designation" for electing the given replica("desiredLeader") to be the leader.
"preferred" for preferred leader election
If preferred election is selection, the or "unclean" for a random unclean leader election, election is only performed if the or "longest_log_agressive"/"longest_log_balanced" to choose the replica
current leader is not the preferred with the longest log
leader for the topic partition. If or "designation" for electing the given replica("desiredLeader") to be the leader.
longest_log_agressive/longest_log_balanced/designation If preferred election is selection, the election is selected, the election is only performed if therethe arecurrent noleader leaderis fornot the topicpreferred leader for the topic partition. REQUIRED.If --path-to-json-file <String: Path to longest_log_agressive/longest_log_balanced/designation The JSON file with the list of JSON file> partitionelection foris whichselected, leaderthe elections election shouldis beonly performed. Thisif isthere an are exampleno format.leader Thefor desiredLeaderthe fieldtopic ispartition. onlyREQUIRED. required in DESIGNATION election. --path-to-json-file <String: Path to The JSON file with the list of JSON file> {"partitions": partition for which leader elections [{"topic": "foo", "partition": 1, "desiredLeader": 0}, should be performed. This is an {"topic": "foobar", "partition": 2, "desiredLeader": 1}] example format. The desiredLeader field } is only required in DESIGNATION election. Not allowed if --all-topic-partitions {"partitions": or --topic flags are specified. // Updated field ends.

Config changes

The new configs are introduced for Unclean Recovery.

...

        	[{"topic": "foo", "partition": 1, "desiredLeader": 0},
                                        	 {"topic": "foobar", "partition": 2, "desiredLeader": 1}]
                                        }
                                        Not allowed if --all-topic-partitions
                                          or --topic flags are specified.
// Updated field ends.

Config changes

The new configs are introduced for ELR

  1. eligible.leader.replicas.enabled. It controls whether the controller will record the ELR-related metadata and whether ISR can be empty. False is the default value. It will turn true in the future.
  2. max.request.partition.size.limit. The maximum number of partitions to return in a API response.

The new configs are introduced for Unclean Recovery.

  1. unclean.recovery.strategy. Described in the above section. Balanced is the default value. 
  2. unclean.recovery.manager.enabled. True for using the unclean recovery manager to perform an unclean recovery. False means the random election will be used in the unclean leader election. False is the default value.
  3. unclean.recovery.timeout.ms. The time limits of waiting for the replicas' response during the Unclean Recovery. 5 min is the default value.

New Errors

REQUEST_LIMIT_REACHED

As we introduce the request limit for the new APIs, the items sent in the request but over the limit will be returned with REQUEST_LIMIT_REACHED. It is a retriable error.

Admin API/Client changes

The admin client will start to use the DescribeTopicsRequest to describe the topic.

  1. The client will split a large request into proper pieces and send them one after another if the requested topics count reaches the limit.
  2. The client will retry querying the topics if they received the response with Cursor field. 
  3. The output of the topic describe will be updated with the ELR related fields.
  4. TopicPartitionInfo will be updated to include the ELR related fields

...

  1. .

Metrics

The following metrics will be added for ELR

...

  • min.insync.replicas will no longer be effective to be larger than the replication factor. For existing configs, the min.insync.replicas will be min(min.insync.replicas, replication factor).

  • Cluster admin should update the min.insync.replicas to 1if they want to have the replication going when there is only the leader in the ISR.

  • Note that, this new requirement is not guarded by any feature flags/Metadata version.

ELR

It will be guarded by a new metadata version and the eligible.leader.replicas.enabled. So it is not enabled during the rolling upgrade.

After the controller picked up the new MV and eligible.leader.replicas.enabled is true,  when it loads the partition states, it will populate the ELR as empty if the PartitionChangeRecord uses an old version. In the next partition update, the controller will record the current ELR. 

Note, the leader can only enable empty ISR after the new metadata version.

DowngradeMV downgrade: Once the MV version is downgraded, all the ELR related fields will be removed on the next partition change. The controller will also ignore the ELR fields.

Software downgrade: After the MV version is downgraded, a metadata delta will be generated to capture the current status. Then the user can start the software downgrade.

Clean shutdown

It will be guarded by a new metadata version. Before that, the controller will treat all the registrations with unclean shutdowns.

...