Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. The kafka-leader-election.sh tool will be upgraded to allow manual leader election.

    1. It can directly select a leader.

    2. It can trigger an unclean recovery for the replica with the longest log in either Aggressive or Balance mode.

  2. Configs to update. Please refer to the Config Changes section

    1. unclean.recovery.strategy. Described in the above section. Balanced is the default value. 
    2. unclean.recovery.manager.enabled. True for using the unclean recovery manager to perform an unclean recovery. False otherwise. False is the default value.
    3. unclean.recovery.timeout.ms. The time limits of waiting for the replicas' response during the Unclean Recovery. 5 min is the default value.
    For compatibility, the original unclean.leader.election.enable options True/False will be mapped to unclean.recovery.strategy options.
    1. unclean.leader.election.enable.false -> unclean.recovery.strategy.Balanced
    2. unclean.leader.election.enable.true -> unclean.recovery.strategy.Aggressive

...

CleanShutdownFile (Coming with ELR)

It will be a JSON file.

{ 
"version": 0
"BrokerEpoch":"xxx"
}

ElectLeadersRequest (Coming with Unclean Recovery)

...

Limit: 20 topics per request. If more than 20 topics are included, only the first 20 will be served. Others will be returned with DesiredLeadersInvalidRequestError.

{
  "apiKey": 43,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "ElectLeadersRequest",
  "validVersions": "0-3",
  "flexibleVersions": "2+",
  "fields": [
  ...
  { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
    "about": "The topic partitions to elect leaders.",
    "fields": [
    ...

// New fields begin.
  { "name": "DesiredLeaders", "type": "[]int32", "versions": "3+",
 "nullableVersions": "3+",
      "about": "The desired leaders. The entry should matchesmatch with the entry in Partitions by the index." }, }, // New fields end. ] }, { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", "about": "The time in ms to wait for the election to complete." } ] }

...

Limit: 2000 partitions per request. If more than 2000 partitions are included, only the first 2000 will be served. Others will be returned with InvalidRequestError.

{
  "apiKey":70,
  "type": "request",
  "listeners": ["broker"],
  "name": "GetReplicaLogInfoRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", 
        "about": "The ID of the broker." },
    { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
    "about": "The topic partitions to query the log info.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of this topic whose leader should be elected." },
    ]}
] }

GetReplicaLogInfo Response

{
  "apiKey":70,
  "type": "response",
  "name": "GetReplicaLogInfoResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "about": "The epoch for the broker." }
    { "name": "TopicPartitionLogInfoList", "type": "[]TopicPartitionLogInfo", "versions": "0+", 
    "about": "The list of the log info.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
{ "name": "PartitionLogInfo", "type": "[]PartitionLogInfo", "versions": "0+", "about": "The log info of a partition.",
"fields": [
     { "name": "Partition", "type": "int32", "versions": "0+", "about": "The id for the partition." }, { "name": "LastWrittenLeaderEpoch", "type": "int32", "versions": "0+", "about": "The last written leader epoch in the log." }, { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "about": "The current leader epoch for the partition from the broker point of view." }, { "name": "LogEndOffset", "type": "int64", "versions": "0+", "about": "The log end offset for the partition." }
partition." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The result error, or zero if there was no error."},
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The result message, or null if there was no error."} ]}, ]} ] }


kafka-leader-election.sh (Coming with Unclean Recovery)

...
// Updated field starts.
--election-type <[PREFERRED, UNCLEAN, LONGEST_LOG_AGGRESSIVE, LONGEST_LOG_BALANCED, DESIGNATION]:                
                                          Type of election to attempt. Possible
  election type>                          values are 
"preferred" for preferred leader election
or "unclean" for a random unclean leader election, or "longest_log_agressive"/"longest_log_balanced" to choose the replica
with the longest log
or "designation" for electing the given replica("desiredLeader") to be the leader.
If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If longest_log_agressive/longest_log_balanced/designation election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED. --path-to-json-file <String: Path to The JSON file with the list of JSON file> partition for which leader elections should be performed. This is an example format. The desiredLeader field is only required in DESIGNATION election. {"partitions": [{"topic": "foo", "partition": 1, "desiredLeader": 0}, {"topic": "foobar", "partition": 2, "desiredLeader": 1}] } Not allowed if --all-topic-partitions or --topic flags are specified. // Updated field ends.

Config changes

The new configs are introduced for Unclean Recovery.

  1. unclean.recovery.strategy. Described in the above section. Balanced is the default value. 
  2. unclean.recovery.manager.enabled. True for using the unclean recovery manager to perform an unclean recovery. False means the random election will be used in the unclean leader election. False is the default value.
  3. unclean.recovery.timeout.ms. The time limits of waiting for the replicas' response during the Unclean Recovery. 5 min is the default value.

Metrics

The following metrics will be added for ELR

...