Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Vote pending Accepted

Discussion thread: here

Vote thread: here

...

  1. Change of the ELR does not require a leader epoch bump. In most cases, the ELR updates along with the ISR changes. The only case of the ELR changes alone is when an ELR broker registers after an unclean shutdown. In this case, no need to bump the leader epoch.

  2. When updating the config min.insync.replicas, if the new min ISR <= current ISR, the ELR will be removed.

  3. A new metric of Electable leaders will be added. It reflects the count of (ISR + ELR).

  4. The AlterPartitionReassignments. The leader updates the ISR implicitly with AlterPartition requests. The controller will make sure than upon completion, the ELR only contains replicas in the final replica set. Additionally, in order to improve the durability of the reassignment

    1. The current behavior, when completing the reassignment, all the adding replicas should be in ISR. This behavior can result in 1 replica in ISR. Also, ELR may not help here because the removing ISR replicas can not stay in ELR when completed. So we propose to enforce that the reassignment can only be completed if the ISR size is larger or equal to min ISR.
    2. This min ISR requirement is also enforced when the reassignment is canceled.
  5. Have a new admin API  DescribeTopicRequest DescribeTopicsRequest for showing the topic details. We don't want to embed the ELR info in the Metadata API. The ELR is not some necessary details to be exposed to user clients.

    1. More public facing details will be discussed in the DescribeTopicRequest  DescribeTopicsRequest section.
  6. We also record the last-known ELR members.

    1. It basically means when an ELR member has an unclean shutdown, it will be removed from ELR and added to the LastKnownELR. The LastKnownELR will be cleaned when ISR reaches the min ISR.

    2. LastKnownELR is stored in the metadata log.

    3. LastKnownELR will be also useful in the Unclean Recovery section.

  7. The last known leader will be tracked.
    1. This can be used if the Unclean recovery is not enabled. More details will be discussed in the Deliver Plan.
    2. The controller will record the last ISR member(the leader) when it is fenced.
    3. It will be cleaned when a new leader is elected.

...

{
  "apiKey":62,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerRegistrationRequest",
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
...
// New fields begin.
    { "name": "PreviousBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
      "about": "The epoch before a clean shutdown." }
// New fields end.
  ]
}

...

DescribeTopicPartitionsRequest (Coming with ELR)

Should be issued by admin clients. More admin client related details please refer to the Admin API/Client changes

ACL: Describe Topic

Limit: 1000 partitions max per response.

The caller can query the partitions with the first partition id(-1 means fetch all the partitions, others mean to fetch the partition with and after this index).

If the server finds more than 1000 partitions to be included, only the first 1000(alphabet order) will be returned with info.

  1. If a part of the partitions for a topic can't be returned, the first partition not returned will be specified in the next partition field. The rest of the partition will not show up in the response.
  2. If the quota of the 1000 partitions has been used up, the rest of the topics will be returned with REQUEST_LIMIT_REACHED.

Note, the request can have a mix of partition-specific topics and range-query topics.

can list the topics interested or keep the field empty if requests all of the topics.

Pagination.

This is a new behavior introduced. The caller can specify the maximum number of partitions to be included in the response.

If there are more partitions than the limit, these partitions and their topics will not be sent back. In this case, the Cursor field will be populated. The caller can include this cursor in the next request. 

Note,

  • There is also a server-side config to control the maximum number of partitions to return. max.request.partition.size.limit
  • There is no consistency guarantee between requests.
  • It is an admin client facing API, so there is no topic id supported.
{
  "apiKey": 74,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeTopicPartitionsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]TopicRequest", "versions": "0+",
      "about": "The topics to fetch details for.",
      "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The topic name", "versions": "0+", "entityType": "topicName"}
      ]
    },
    { "name": "ResponsePartitionLimit", "type": "int32", "versions": "0+", "default": "2000",
      "about": "The maximum number of partitions included in the response." },
    { "name": "Cursor", "type": "Cursor", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The first topic and partition index to fetch details for.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The name for the first topic to process", "versions": "0+", "entityType": "topicName"},
      {   "apiKey":69,   "type": "request",   "listeners": ["broker"],   "name": "DescribeTopicRequest",   "validVersions": "0",   "flexibleVersions": "0+",   "fields": [     { "name": "TopicsPartitionIndex", "type": "[]TopicRequestint32", "versions": "0+",        "about": "The topicspartition index to fetch details for.", "versions start with"}
    ]}
  ]
}

DescribeTopicsResponse

{
  "apiKey": 74,
  "type": "0+response",
  "entityTypename": "topicRequestDescribeTopicPartitionsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "FirstPartitionIdThrottleTimeMs", "type": "int32", "defaultversions": "-10+", "versionsignorable": "0+"true,
      "about": "The firstduration partitionin tomilliseconds fetch details for. -1which meansthe torequest fetchwas allthrottled partitions,
others mean to fetch the partitions with and after this index."}
]} ] }

DescribeTopicResponse

{
  "apiKey":69,
  "type": "request",
  "name": "DescribeTopicResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]MetadataResponseTopicDescribeTopicPartitionsResponseTopic", "versions": "0+",       
      "about": "Each topic in the response.", "fields": [       
      { "name": "ErrorCode", "type": "int16", "versions": "0+",         
        "about": "The topic error, or 0 if there was no error." },       " },
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "0+",         
        "about": "The topic name." },       
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." },       
      { "name": "IsInternal", "type": "bool", "versions": "0+", "default": "false", "ignorable": true,         
        "about": "True if the topic is internal." }, internal." },
      { "name": "Partitions", "type": "[]DescribeTopicPartitionsResponsePartition", "versions": "0+",
        "about": "Each partition in the topic.", "fields": [
        { "name": "PartitionsErrorCode", "type": "[]MetadataResponsePartitionint16", "versions": "0+",         
          "about": "EachThe partition in the topic.", "fields": [         error, or 0 if there was no error." },
        { "name": "ErrorCodePartitionIndex", "type": "int16int32", "versions": "0+",           
          "about": "The partition error, or 0 if there was no error." },         index." },
        { "name": "PartitionIndexLeaderId", "type": "int32", "versions": "0+",            "entityType": "brokerId",
          "about": "The partition index ID of the leader broker." },         
        { "name": "LeaderIdLeaderEpoch", "type": "int32", "versions": "0+", "entityTypedefault": "brokerId",           -1", "ignorable": true,
          "about": "The IDleader epoch of thethis leader brokerpartition." },         
        { "name": "LeaderEpochReplicaNodes", "type": "[]int32", "versions": "0+", "defaultentityType": "-1brokerId", "ignorable": true,           
          "about": "The leader epoch of set of all nodes that host this partition." },         
        { "name": "ReplicaNodesIsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",           
          "about": "The set of all nodes that host are in sync with the leader for this partition." },         
        { "name": "IsrNodesEligibleLeaderReplicas", "type": "[]int32", "versionsdefault": "0+null", "entityType": "brokerId",           "about": "The set of nodes that are in sync with the leader for this partition
          "versions": "0+", "nullableVersions": "0+",
          "about": "The new eligible leader replicas otherwise." },
        { "name": "EligibleLeaderReplicasLastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",         
          "versions": "0+", "nullableVersions": "0+",         
          "about": "The newlast eligible leader replicas otherwiseknown ELR." },
        { "name": "LastKnownELROfflineReplicas", "type": "[]int32", "defaultversions": "null0+", "entityTypeignorable": "brokerId"true, "versions"entityType": "0+", "nullableVersions": "0+", brokerId",
          "about": "The last known ELR set of offline replicas of this partition." }]},
      { "name": "LastKnownLeaderTopicAuthorizedOperations", "type": "int32", "defaultversions": "null0+", "entityTypedefault": "brokerId-2147483648",
      "versions  "about": "0+", "nullableVersions": "0+", "about": "The last known leader32-bit bitfield to represent authorized operations for this topic." },]
    },
     { "name": "OfflineReplicasNextTopicPartition", "type": "[]int32Cursor", "versions": "0+", "ignorablenullableVersions": true"0+", "entityTypedefault": "brokerIdnull",           
      "about": "The next topic setand ofpartition offlineindex replicasto offetch thisdetails partitionfor." },
"fields": [
      { "name": "TopicAuthorizedOperationsTopicName", "type": "int32string", "versions": "0+",
  "default      "about": "-2147483648",
The name for the first topic to process", "versions": "0+", "aboutentityType": "32-bit bitfield to represent authorized operations for this topic." }]topicName"},
      { "name": "NextPartitionPartitionIndex", "type": "int32", "versions": "0+", "default": "-1",
"about": "The first partition thatindex exceedto the request limit. " }]}, ] start with"}
    ]}
  ]
}

CleanShutdownFile (Coming with ELR)

...

...
// Updated field starts.
--election-type <[PREFERRED, UNCLEAN, LONGEST_LOG_AGGRESSIVE, LONGEST_LOG_BALANCED, DESIGNATION]:                
                                          Type of election to attempt. Possible
  election type>                          values are 
"preferred" for preferred leader election
or "unclean" for a random unclean leader election, or "longest_log_agressive"/"longest_log_balanced" to choose the replica
with the longest log
or "designation" for electing the given replica("desiredLeader") to be the leader.
If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If longest_log_agressive/longest_log_balanced/designation election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED. --path-to-json-file <String: Path to The JSON file with the list of JSON file> partition for which leader elections should be performed. This is an example format. The desiredLeader field is only required in DESIGNATION election. {"partitions": [{"topic": "foo", "partition": 1, "desiredLeader": 0}, {"topic": "foobar", "partition": 2, "desiredLeader": 1}] } Not allowed if --all-topic-partitions or --topic flags are specified. // Updated field ends.

...

 are specified.
// Updated field ends.

Config changes

The new configs are introduced for ELR

  1. eligible.leader.replicas.enabled. It controls whether the controller will record the ELR-related metadata and whether ISR can be empty. False is the default value. It will turn true in the future.
  2. max.request.partition.size.limit. The maximum number of partitions to return in a API response.

The new configs are introduced for Unclean Recovery.

...

The admin client will start to use the DescribeTopicRequest DescribeTopicsRequest to describe the topic.

  1. The client will split a large request into proper pieces and send them one after another if the requested topics count reaches the limit.
  2. The client will retry querying the topics if they received the new retriable error REQUEST_LIMIT_REACHEDresponse with Cursor field. 
  3. The output of the topic describe will be updated with the ELR related fields.
  4. TopicPartitionInfo will be updated to include the ELR related fields.

...

  • min.insync.replicas will no longer be effective to be larger than the replication factor. For existing configs, the min.insync.replicas will be min(min.insync.replicas, replication factor).

  • Cluster admin should update the min.insync.replicas to 1if they want to have the replication going when there is only the leader in the ISR.

  • Note that, this new requirement is not guarded by any feature flags/Metadata version.

ELR

It will be guarded by a new metadata version and the eligible.leader.replicas.enabled. So it is not enabled during the rolling upgrade.

After the controller picked up the new MV and eligible.leader.replicas.enabled is true,  when it loads the partition states, it will populate the ELR as empty if the PartitionChangeRecord uses an old version. In the next partition update, the controller will record the current ELR. Note, the leader can only enable empty ISR after the new metadata version.

MV downgrade: Once the MV version is downgraded, all the ELR related fields will be removed on the next partition change. The controller will also ignore the ELR fields.

...