Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updates according to Jun's comments

...

The URM manages the recovery process for a leaderless partition. This new unclean recovery process takes the place of the unclean leader election. Instead of electing a random unfenced replica as the leader, the URM will query the log end offset and the leader epoch from each unfenced replica. The one with the highest leader epoch and the longest log end offset will be the new leader.

...

  1. Only the unfenced replicas can be counted into the quorum. So when a replica gets unfenced, URM should check if it can be elected.The URM will query all the replicas including the fenced replicas.

  2. In case of any unforeseen failures that the URM stops the retry to recover or the task hangs for a long time, the controller will trigger the recovery again when handling heartbeats.

...

  1. The kafka-leader-election.sh tool will be upgraded to allow manual leader election.

    1. It can directly select a leader.

    2. It can trigger an unclean recovery for the replica with the longest log in either Proactive or Balance mode.

  2. The current partition reassignment can be finished as long as the added replicas are in the ISR.

Public Interfaces

We will deliver the KIP in phases, so the API changes are also marked coming with either ELR or Unclean Recovery.

PartitionChangeRecord (coming with ELR)

{
  "apiKey": 5,
  "type": "metadata",
  "name": "PartitionChangeRecord",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Isr", "type":  "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0,
      "about": "null if the ISR didn't change; the new in-sync replicas otherwise." },
    { "name": "Leader", "type": "int32", "default": "-2", "entityType": "brokerId",
      "versions": "0+", "taggedVersions": "0+", "tag": 1,
      "about": "-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." },
    { "name": "Replicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 2,
      "about": "null if the replicas didn't change; the new replicas otherwise." },
    { "name": "RemovingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 3,
      "about": "null if the removing replicas didn't change; the new removing replicas otherwise." },
    { "name": "AddingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 4,
      "about": "null if the adding replicas didn't change; the new adding replicas otherwise." },
    { "name": "LeaderRecoveryState", "type": "int8", "default": "-1", "versions": "0+", "taggedVersions": "0+", "tag": 5,
      "about": "-1 if it didn't change; 0 if the leader was elected from the ISR or recovered from an unclean election; 1 if the leader that was elected using unclean leader election and it is still recovering." },
// New fields begin.
    { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 6,
      "about": "null if the ELR didn't change; the new eligible leader replicas otherwise." }
    { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 7,
      "about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." }
// New fields end.
  ]
}

PartitionRecord (coming with ELR)

{
  "apiKey": 3,
  "type": "metadata",
  "name": "PartitionRecord",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Replicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
      "about": "The replicas of this partition, sorted by preferred order." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
      "about": "The replicas that we are in the process of removing." },
    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
      "about": "The replicas that we are in the process of adding." },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderRecoveryState", "type": "int8", "default": "0", "versions": "0+", "taggedVersions": "0+", "tag": 0,
      "about": "1 if the partition is recovering from an unclean leader election; 0 otherwise." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The epoch of the partition leader." },
    { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change anything in the partition." },
// New fields begin.
    { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 1,
    "about": "The eligible leader replicas of this partition." } { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",       "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 2,
     "about": "The last known eligible leader replicas of this partition." } // New fields end. ] }

BrokerRegistration API (coming with ELR)

{
  "apiKey":62,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerRegistrationRequest",
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID." },
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "about": "The cluster id of the broker process." },
    { "name": "IncarnationId", "type": "uuid", "versions": "0+",
      "about": "The incarnation id of the broker process." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname." },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port." },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol." }
    ]
    },
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported feature level." }
    ]
    },
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is in." },
    { "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
      "about": "If the required configurations for ZK migration are present, this value is set to true" },
// New fields begin.
    { "name": "PreviousBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
      "about": "The epoch before a clean shutdown." }
// New fields end.
  ]
}

DescribeTopicRequest (Coming with ELR)

Should be issued by admin clients or brokers. The controller will serve this request.

...

{
  "apiKey":69,
  "type": "request",
  "name": "DescribeTopicResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]MetadataResponseTopic", "versions": "0+",
      "about": "Each topic in the response.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The topic error, or 0 if there was no error." },
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "0+",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." },
      { "name": "IsInternal", "type": "bool", "versions": "0+", "default": "false", "ignorable": true,
        "about": "True if the topic is internal." },
      { "name": "Partitions", "type": "[]MetadataResponsePartition", "versions": "0+",
        "about": "Each partition in the topic.", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The partition error, or 0 if there was no error." },
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the leader broker." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The leader epoch of this partition." },
        { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
          "about": "The set of all nodes that host this partition." },
        { "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
          "about": "The set of nodes that are in sync with the leader for this partition." },
{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",          "versions": "0+", "nullableVersions": "0+",          "about": "null if the ELR didn't change; the new eligible leader replicas otherwise." }, { "name": "LastKnownLeaderLastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId", "versions": "0+", "nullableVersions": "0+", "about": "The last leader beforenull if the LastKnownELR didn't change. Otherwise, the partitionlast becomesknown leaderlessELR." },
       { "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", "ignorable": true, "entityType": "brokerId",           "about": "The set of offline replicas of this partition." }       ]} ] }

CleanShutdownFile (Coming with ELR)

{ "BrokerEpoch":"xxx"}

ElectLeadersRequest (Coming with Unclean Recovery)

{
  "apiKey": 43,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "ElectLeadersRequest",
  "validVersions": "0-3",
  "flexibleVersions": "2+",
  "fields": [
  { "name": "ElectionType", "type": "int8", "versions": "1+",
    "about": "Type of elections to conduct for the partition. A value of '0' elects the preferred replica. A value of '1' elects the desired replicas.
    A value of '2' elects the replica with the longest log among the last known ELRs. A value of '3' elects the replica with the longest log among all the replicas that can reply within a fixed time." },
  { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
    "about": "The topic partitions to elect leaders.",
    "fields": [
  { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
    "about": "The name of a topic." },
  { "name": "Partitions", "type": "[]int32", "versions": "0+",
    "about": "The partitions of this topic whose leader should be elected." },

// New fields begin.
  { "name": "DesiredLeaders", "type": "[]int32", "versions": "3+",
      "about": "The desired leaders. The entry should matches with the entry in Partitions by the index." },
  },
// New fields end.

  ] },
  { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
    "about": "The time in ms to wait for the election to complete." }
  ] 
}

GetReplicaLogInfo Request (Coming with Unclean Recovery)

ACL: Read Topic CLUSTER_ACTION

{
  "apiKey":70,
  "type": "request",
  "listeners": ["broker"],
  "name": "GetReplicaLogInfoRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", 
        "about": "The ID of the broker." },
    { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
    "about": "The topic partitions to elect leaders.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of this topic whose leader should be elected." },
    ]}
] }

...

{
  "apiKey":70,
  "type": "response",
  "name": "GetReplicaLogInfoResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "about": "The epoch for the broker." }
    { "name": "LogInfoList", "type": "[]LogInfo", "versions": "0+", 
    "about": "The list of the log info.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partition", "type": "int32", "versions": "0+", "about": "The id for the partition." },
      { "name": "LastWrittenLeaderEpoch", "type": "int32", "versions": "0+", "about": "The last written leader epoch in the log." },
      { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "about": "The current leader epoch for the partition from the broker point of view." },
      { "name": "LogEndOffset", "type": "int64", "versions": "0+", "about": "The log end offset for the partition." }
    ]}
] }

kafka-leader-election.sh (Coming with Unclean Recovery)

--admin.config <String: config file>    Configuration properties files to pass
                                          to the admin client
--all-topic-partitions                  Perform election on all of the
                                          eligible topic partitions based on
                                          the type of election (see the --
                                          election-type flag). Not allowed if
                                          --topic or --path-to-json-file is
                                          specified.
--bootstrap-server <String: host:port>  A hostname and port for the broker to
                                          connect to, in the form host:port.
                                          Multiple comma separated URLs can be
                                          given. REQUIRED.
// Updated field starts.
--election-type <[PREFERRED, UNCLEAN, LONGEST_LOG_PROACTIVE, LONGEST_LOG_BALANCED, DESIGNATION]:                
                                          Type of election to attempt. Possible
  election type>                          values are "preferred" for preferred
                                          leader election, or "unclean" for
                                          a random unclean leader election,
                                          or "longest_log_proactive"/"longest_log_balanced"
                                          to choose the replica 
                                          with the longest log or "designation" for
                                          electing the given replica to be the leader. If
                                          preferred election is selection, the
                                          election is only performed if the
                                          current leader is not the preferred
                                          leader for the topic partition. If
                                          longest_log_proactive/longest_log_balanced/designation 
                                          election is selected, the
                                          election is only performed if there
                                          are no leader for the topic
                                          partition. REQUIRED.
// Updated field ends.
--help                                  Print usage information.
--partition <Integer: partition id>     Partition id for which to perform an
                                          election. REQUIRED if --topic is
                                          specified.
                                          
// Updated field starts.                                        
--path-to-json-file <String: Path to    The JSON file with the list  of
  JSON file>                              partition for which leader elections
                                          should be performed. This is an
                                          example format. The desiredLeader field
                                          is only required in DESIGNATION election.
                                        
  The minimalReplicas field is only required in
                                          LONGEST_LOG election
                                        {{"partitions":
                                        	[{"topic": "foo", "partition": 1, "desiredLeader": 0, "minimalReplicas": 2},
                                        	 {"topic": "foobar", "partition": 2, "desiredLeader": 1, "minimalReplicas": 2}]
                                        }
                                        Not allowed if --all-topic-partitions
                                          or --topic flags are specified.
// Updated field ends.

--topic <String: topic name>            Name of topic for which to perform an
                                          election. Not allowed if --path-to-
                                          json-file or --all-topic-partitions
                                          is specified.
--version                               Display Kafka version.

...

The following gauge metrics will be added for ELR

  • kafka.replication.electable_replicas_count. It will be the sum of (size of ISR + size of ELR). It is a partition level metric.

The following gauge metrics will be added for Unclean Recovery

  • kafka.replication.unclean_recovery_partitions_count. It counts the partitions that are under unclean recovery. It will be unset/set to 0 when there is no unclean recovery happening. Note, if in Balance mode, the members in LastKnownELR are not all unfenced, it is also counted as a live recovery.
  • kafka.replication.manual_operation_requiredleader_election_required_partition_countunclean_recovery_partitions_count. It counts the partition that is leaderless and waits for user operations to find the next leader.

...

  • ELR. The main difference is in the leader election and the unclean leader election.
    • The unclean leader election will remain the same as the current. No change to the unclean.leader.election.enable and the behavior is random select an unfenced replica as the leader.
    • Leader election will be different when ISR and ELR are both empty. In this case, we try to maintain the "last known leader" behavior. Basically, when the last leader gets fenced, the LastKnownELR field will be also updated. The last leader will be put at the front of the LastKnownELR list. Then if the last leader can be unfenced, it will be elected as the leader. In this way, if only ELR is delivered, there is no regression in availability.
      • In summary, if unclean.leader.election.enable is false and the ELR is empty, the controller will elect the first replica in the LastKnownELR to be the leader when it is unfenced. If this replica can't be unfenced, then the controller will keep waiting.
  • Unclean recovery. 
    • The unclean leader election will be replaced by the unclean recovery.
    • unclean.leader.election.enable will only be replaced by the unclean.recovery.strategy after ELR is delivered.
    • As there is no change to the ISR, the "last known leader" behavior is maintained.

...