Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updates for Colin's conmments

...

Additional High Watermark advancement requirement

We propose to enforce that High Watermark can only advance if the ISR size is larger or equal to min.insync.replicas.

To help you understand how we came up with this proposal. A quick recap of some key concepts.

  • High Watermark.

    • In ISR, each server maintains a high watermark, which represents the highest offset of the replicated log known to be committed / durably stored.

    • Also, for consumers, only the message above the High Watermark is visible to them.

  • Ack=0/1/all produce request. It defines when the Kafka server should respond to the produce request.

    • For ack=0 requests, the server will not respond to the message and just put it into the leader's local log.
    • For ack=1 requests, the server should respond when the message is persisted in the leader’s local log.

    • For ack=all requests, the server should respond when the message is persisted in all the ISR members' local log and the size of the ISR member is larger than min ISR.

In order to have a message sent to the server being available for a consumer to fetch, there are 2 main steps. First, the produce request has to be acked by the server. This can be controlled by the produce request's ack requirement. Second, the replication is good enough to advance the HWM above the message. Notably, in the current system, min ISR is a factor only useful when accepting an ack=all request. It has no use in accepting ack=0/1 requests and the HWM advancement. This behavior complicates the durability model in a mixed type of requests workload.

We mostly want to enhance the durability of the ack=all requests. In the scenario raised in the motivation section, the server may receive both ack=0/1 and ack=all messages during T1. During this period, the server will reject the ack=all requests due to not enough replicas to meet the min ISR, but it accepts the ack=0/1 requests. Also, because the leader is the only one in the ISR, it is allowed to advance the HWM to the end of its log. Let's say there is some extra info somewhere to let the controller choose broker 1 as the leader at T4 instead. Then there is no data loss for the ack=all requests, but the HWM stored on broker 1 is lower than broker 2. This can cause HWM to move backward and impacts the consumers.

To avoid the ack=0/1 message interference, we propose to enforce that High Watermark can only advance if the ISR size is larger or equal to min.insync.replicas. Here have the above additional requirement on the HWM advancement. Here are some clarifications:

  • It applies to the ack=0/1 message replication as well.  Note that the Note that the leader still acknowledges the client requests when the ack=1 messages have persisted in the leader log.

  • The ISR membership refers to the latest ISR membership persisted by the controller, not the "maximal ISR" which is defined by the leader that includes the current ISR members and pending-to-add replicas that have not yet been committed to the controller.

  • If maximal ISR > ISR, the message should be replicated to the maximal ISR before covering the message under HWM. The ISR refers to the ISR committed by the controller.

...

With the proposal, the ack=0/1 requests will all be acknowledged by the server with the above config, however, no messages can be visible to the clients. For backward compatibility, the effective min.insync.replicas will be min(min.insync.replicas, replication factor).

...

  1. Change of the ELR does not require a leader epoch bump. In most cases, the ELR updates along with the ISR changes. The only case of the ELR changes alone is when an ELR broker registers after an unclean shutdown. In this case, no need to bump the leader epoch.

  2. When updating the config min.insync.replicas, if the new min ISR <= current ISR, the ELR will be removed.

  3. A new metric of Electable leaders will be added. It reflects the count of (ISR + ELR).

  4. The AlterPartitionReassignments. The leader updates the ISR implicitly with AlterPartition requests. The controller will make sure than upon completion, the ELR only contains replicas in the final replica set. Additionally, in order to improve the durability of the reassignment

    1. The current behavior, when completing the reassignment, all the adding replicas should be in ISR. This behavior can result in 1 replica in ISR. Also, ELR may not help here because the removing ISR replicas can not stay in ELR when completed. So we propose to enforce that the reassignment can only be completed if the ISR size is larger or equal to min ISR.
    2. This min ISR requirement is also enforced when the reassignment is canceled.
  5. Have a new admin API  DescribeTopicRequest for showing the topic details. We don't want to embed the ELR info in the Metadata API. The ELR is not some necessary details to be exposed to user clients.

    1. More public facing details will be discussed in the DescribeTopicRequest section.
  6. We also record the We also record the last-known ELR members.

    1. It basically means when an ELR member has an unclean shutdown, it will be removed from ELR and added to the LastKnownELR. The LastKnownELR will be cleaned when ISR reaches the min ISR.

    2. LastKnownELR is stored in the metadata log.

    3. LastKnownELR will be also useful in the Unclean Recovery section.

  7. The last known leader will be tracked.
    1. This can be used if the Unclean recovery is not enabled. More details will be discussed in the Deliver Plan.
    2. The controller will record the last ISR member(the leader) when it is fenced.
    3. It will be cleaned when a new leader is elected.


Leader election

As the proposal changes a lot in our behaviors about the ISR, the leader election behavior will be described in detail in the Unclean Recovery section.

...

  • unclean.leader.election.enable=true, the controller will randomly elect a leader if the last ISR member gets fenced.

  • unclean.leader.election.enable=false, the controller will only elect the last ISR member when it gets unfenced again.

The above “Last Leader” behavior can’t be maintained with an empty ISR and it should be removed. Also, randomly Randomly electing a leader is definitely worth improving. As a result, we decide to enhance replace the unclean leader election and update the unclean leader election config to an intent-based config.with the Unclean Recovery.

The Unclean Recovery uses a deterministic way to elect the leader persisted the most data. On a high level, once the unclean recovery is triggered, the controller will use a new API GetReplicaLogInfo to query the log end offset and the leader epoch from each replica. The one with the highest leader epoch plus the longest log end offset will be the new leader. To help explain when and how the Unclean Recovery is performed, let's first introduce some config changes.

The current unclean.leader.election.enable will be replaced by an intent-based config, unclean.unclean.recovery.strategy has the following 3 options.

...

With the new config, the leader election decision will be made in the following order when the ISR and ELR meets meet the requirements:

  1. If there are other ISR members, choose an ISR member.

  2. If there are unfenced ELR members, choose an ELR member.

  3. If there are fenced ELR members

    1. If the unclean.recovery.strategy=Proactive, then an unclean recovery will happen.

    2. Otherwise, we will wait for the fenced ELR members to be unfenced.

  4. If there are no ELR members.

    1. If the unclean.recovery.strategy=Proactive, the controller will do the unclean recovery.

    2. If the unclean.recovery.strategy=Balanced, the controller will do the unclean recovery when all the LastKnownELR are unfenced. See the following section for the explanations.
    3. Otherwise, unclean.recovery.strategy=Manual, the controller will not attempt to elect a leader. Waiting for the user operations.

Note that, the unclean.recovery.strategy will be a topic-level config.

In order to support the unclean recovery, introduce a new component in the controller called Unclean Recovery Manager.

Unclean Recovery Manager(URM)

The URM manages the recovery process for a leaderless partition. This new unclean recovery process takes the place of the unclean leader election. Instead of electing a random unfenced replica as the leader, the URM will query the log end offset and the leader epoch from each unfenced replica. The one with the highest leader epoch and the longest log end offset will be the new leader.

Workflow

The URM takes the topic partition to start an unclean recovery task.

Next, the URM will initiate the log query requests with a new component BrokerRequestSender(BRS) which handles the RPC request asynchronously. Then the query requests will be sent in a new GetReplicaLogInfo API. The response should include the following information for each partition:

  • Topic and partition id

  • Log end offset

  • Partition leader epoch in the log

  • Broker epoch

  • Current partition leader epoch in the metadata cache.

Once the GetReplicaLogInfo is received, the response will not be directly passed back to the URM, instead, BRS will parse the response as a controller event and put it in the event queue. Later URM can consume the events. This behavior minimizes the change to the controller's single-threaded structure.

The URM will verify the GetReplicaLogInfo response in the following ways:

  1. Reject the response if the broker epoch mismatch. This can avoid electing a broker that has rebooted after it makes the response.

  2. Reject the response if the partition leader epoch in the metadata cache mismatches with the partition leader epoch on the controller side. This fences stale GetReplicaLogInfo responses.

  3. Note if the response is rejected and the leader has not been elected, URM will initiate the log query again.

After the verification, the URM will trigger the election when

  1. In Balance mode, all the LastKnownELR members have replied, plus the replicas replied within the timeout. Due to this requirement, the controller will only start the recovery if the LastKnownELR members are all unfenced.

  2. In Proactive mode, any replicas replied within a fixed amount of time OR the first response received after the timeout. We don’t want to make a separate config for it so just make the fixed time of 5 seconds.

Then during the election, URM first filters the replicas with the highest partition leader epoch, then it elects the one with the longest log end offset as the new leader.

An ideal workflow

Image Removed

Failovers

  • Broker failover.

    • If the replica fails before it receives the GetReplicaLogInfo request, it can just send the log info along with its current broker epoch.

    • If the replica fails after it responds to the GetReplicaLogInfo request

      • If the controller received the new broker registration, the URM can reject the response because the broker epoch in the request mismatches with the broker registration.

      • Otherwise, the replica may become the leader but will be fenced later when it registers.

  • Controller failover.

    • The URM does not store anything in the metadata log, every controller failover will result in a new unclean recovery.

Clarifications

  1. Only the unfenced replicas can be counted into the quorum. So when a replica gets unfenced, URM should check if it can be elected.

  2. In case of any unforeseen failures that the URM stops the retry to recover or the task hangs for a long time, the controller will trigger the recovery again when handling heartbeats.

Broker Request Sender

This component is also a part of the controller. It mainly handles the RPC requests concurrently. It should have limited access to the controller components to avoid adding concurrent handling to the controller.

It will accept to-broker requests and requires a callback function to parse the responses to a controller event. The event will be put in the controller event queue.

In order to batch requests, the BRS maintains per-broker request queues. BRS can merge the requests in the queue and send them in one request.

Clarifications

  1. If the retry timeout is reached or there are any network issues between the controller and the broker, the BRS will parse the Error into the controller event to let the URM triggers the next round.

  2. The BRS will use separate threads from the controller.

Other

  1. The kafka-leader-election.sh tool will be upgraded to allow manual leader election.

    1. It can directly select a leader.

    2. It can trigger an unclean recovery for the replica with the longest log in either Proactive or Balance mode.

  2. The current partition reassignment can be finished as long as the added replicas are in the ISR.

Public Interfaces

We will deliver the KIP in phases, so the API changes are also marked coming with either ELR or Unclean Recovery.

PartitionChangeRecord (coming with ELR)

The controller will initiate the Unclean Recovery once a partition meets the above conditions. As mentioned above, the controller collects the log info from the replicas. Apart from the Log end offset and the partition leader epoch in the log, the replica also returns the following for verification:

  • Replica's broker epoch. This can avoid electing a broker that has rebooted after it made the response.
  • The current partition leader epoch in the replica's metadata cache. This is to fence stale GetReplicaLogInfo responses.

The controller can start an election when:

  • In Balance mode, all the LastKnownELR members have replied, plus the replicas replied within the timeout. Due to this requirement, the controller will only start the recovery if the LastKnownELR members are all unfenced.
  • In Proactive mode, any replicas replied within a fixed amount of time OR the first response received after the timeout.


The behaviors in the failover:

  • Broker failover.

    • If the replica fails before it receives the GetReplicaLogInfo request, it can just send the log info along with its current broker epoch.

    • If the replica fails after it responds to the GetReplicaLogInfo request

      • If the controller receives the new broker registration, the controller can reject the response because the broker epoch in the request mismatches with the broker registration.

      • Otherwise, the replica may become the leader but will be fenced later when it registers.

  • Controller failover.

    • The controller does not store anything in the metadata log, every controller failover will result in a new unclean recovery.

Other

  1. The kafka-leader-election.sh tool will be upgraded to allow manual leader election.

    1. It can directly select a leader.

    2. It can trigger an unclean recovery for the replica with the longest log in either Proactive or Balance mode.

  2. Configs to add
    1. unclean.recovery.strategy. Described in the above section. 
    2. unclean.recovery.Enabled. True for enabling the unclean recovery. False otherwise.
    3. unclean.recovery.timeout.ms. The time limits of waiting for the replicas' response during the Unclean Recovery.
  3. For a better user experience, the unclean.recovery.strategy and unclean.leader.election.enable will be converted if unclean.recovery.Enabled is changed.
    1. unclean.recovery.Enabled from false to true

      unclean.leader.election.enableunclean.recovery.strategy 
      falseBalanced
      trueProactive


    2. unclean.recovery.Enabled from true to false

      unclean.recovery.strategy unclean.leader.election.enable
      Proactivetrue
      Balancedfalse
      Manualfalse


Public Interfaces

We will deliver the KIP in phases, so the API changes are also marked coming with either ELR or Unclean Recovery.

PartitionChangeRecord (coming with ELR)

{
  "apiKey": 5,
  "type": "metadata",
  "name": "PartitionChangeRecord",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
...
// New fields begin.
    { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 6,
      "about": "null if the ELR didn't change; the new eligible leader replicas otherwise." }
    { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "1+", "nullableVersions": "1{
  "apiKey": 5,
  "type": "metadata",
  "name": "PartitionChangeRecord",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Isr", "type":  "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "01+", "tag": 07,
      "about": "null if the ISRLastKnownELR didn't change; the last new in-syncknown eligible leader replicas otherwise." },
    
{ "name": "LeaderLastKnownLeader", "type": "int32", "default": "-2null", "entityType": "brokerId",       "versions": "01+", "taggedVersionsnullableVersions": "01+", "tagtaggedVersions": "1+", "tag": 8,       "about": "-1 ifmeans thereno islast now noknown leader; -2needs ifto thebe leader didn't change; the new leader otherwise." },     { "name": "Replicas",tracked." }
// New fields end.   ] }

PartitionRecord (coming with ELR)

{
  "apiKey": 3,
  "type": "[]int32metadata",
 "default": "nullname", "entityType": "brokerIdPartitionRecord",
      "versions": "0+", "nullableVersionsvalidVersions": "0+-1",
  "taggedVersionsflexibleVersions": "0+",
  "tagfields": 2,
      "about": "null if the replicas didn't change; the new replicas otherwise." },[
...
// New fields begin.
    { "name": "RemovingReplicasEligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "01+", "nullableVersions": "01+", "taggedVersions": "01+", "tag": 3,
      1,
    "about": "nullThe ifeligible the removingleader replicas didn't change; the new removing replicas otherwise." },     of this partition." } { "name": "AddingReplicasLastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",       "versions": "01+", "nullableVersions": "01+", "taggedVersions": "01+", "tag": 4,       2,
     "about": "nullThe last ifknown theeligible addingleader replicas didn't change; the new adding replicas otherwise." },     of this partition." }
{ "name": "LeaderRecoveryStateLastKnownLeader", "type": "int8int32", "default": "-1null", "entityType": "brokerId",       "versions": "1+", "versionsnullableVersions": "01+", "taggedVersions": "01+", "tag": 58,       "about": "-1 ifmeans itno didn't change; 0 if thelast known leader wasneeds electedto frombe thetracked." ISR} or recovered
// fromNew anfields uncleanend. election; 1 if the leader that was elected using unclean leader election and it is still recovering." }, ] }

BrokerRegistration API (coming with ELR)

{
  "apiKey":62,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerRegistrationRequest",
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
...
// New fields begin.
    { "name": "EligibleLeaderReplicasPreviousBrokerEpoch", "type": "[]int32int64", "defaultversions": "null2+", "entityTypedefault": "brokerId-1",
      "versionsabout": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 6,
      "about": "null if the ELR didn't change; the new eligible leader replicas otherwise." }
    { "name": The epoch before a clean shutdown." }
// New fields end.
  ]
}

DescribeTopicRequest (Coming with ELR)

Should be issued by admin clients. The broker will serve this request. As a part of the change, the admin client will start to use DescribeTopicRequest

to query the topic, instead of using the metadata requests.

On the other hand, The following changes may affect the client side.

  • The TopicPartitionInfo will also updated to include the ELR info.
  • kafka-topics.sh does not have changes to its API but will have new fields in the output for the ELR, LastKnownELR, and LastKnownLeader.

ACL: Describe Topic

Limit: 20 topics max per request

{
  "apiKey":69,
  "LastKnownELR", "type": "[]int32request", 
  "defaultlisteners": ["nullbroker"], 
  "entityTypename": "brokerIdDescribeTopicRequest",
        "versionsvalidVersions": "10",
  "flexibleVersions": "0+",
  "fields": [
    { "nullableVersionsname": "1+Topics", "taggedVersionstype": "1+[]string", "tagversions": 7"0+",
      "about": "nullThe iftopics theto LastKnownELRfetch didn't change; the last known eligible leader replicas otherwise." }
// New fields end.
  ]
}

PartitionRecord (coming with ELR)

details for.", "versions": "0+", "entityType": "topicName"}
]
}

DescribeTopicResponse

{
  "apiKey":69,
  {
  "apiKey": 3,
  "type": "metadatarequest",
    "name": "PartitionRecordDescribeTopicResponse",
    "validVersions": "0-1",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "PartitionIdTopics", "type": "int32[]MetadataResponseTopic", "versions": "0+", 
      "defaultabout": "-1",
Each topic in the response.",  "aboutfields": "The partition id." },
    [
      { "name": "TopicIdErrorCode", "type": "uuidint16", "versions": "0+",
              "about": "The unique ID of this topic topic error, or 0 if there was no error." },
          { "name": "ReplicasName", "type":  "[]int32string", "versions":  "0+", "mapKey": true, "entityType": "brokerIdtopicName",
      "nullableVersions": "0+",
        "about": "The replicas of this partition, sorted by preferred ordertopic name." },
          { "name": "IsrTopicId", "type":  "[]int32uuid", "versions":  "0+",
     "ignorable": true, "about": "The in-sync replicas of this partitiontopic id." },
          { "name": "RemovingReplicasIsInternal", "type":  "[]int32bool", "versions":  "0+", "entityTypedefault": "brokerIdfalse",
      "ignorable": true,
        "about": "TheTrue replicas that we are in if the processtopic ofis removinginternal." },
      { "name": "AddingReplicasPartitions", "type":  "[]int32MetadataResponsePartition", "versions":  "0+", "entityType": "brokerId",
      
        "about": "TheEach replicas that we are partition in the process of addingtopic." },
    "fields": [
        { "name": "LeaderErrorCode", "type": "int32int16", "versions": "0+", "default": "-1", "entityType": "brokerId",
                "about": "The leadpartition replicaerror, or -10 if there iswas no leadererror." },
            { "name": "LeaderRecoveryStatePartitionIndex", "type": "int8int32", "default": "0", "versions": "0+", 
          "taggedVersionsabout": "0+", "tagThe partition index." },
        { "name": 0"LeaderId",
 "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "1The ID ifof the partition is recovering from an unclean leader election; 0 otherwisebroker." },
            { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "ignorable": true,
          "about": "The leader epoch of thethis partition leader." },
            { "name": "PartitionEpochReplicaNodes", "type": "[]int32", "versions": "0+", "defaultentityType": "-1brokerId",
                "about": "AnThe epochset thatof getsall incrementednodes eachthat time we change anything in thehost this partition." },
// New fields begin.
            { "name": "EligibleLeaderReplicasIsrNodes", "type": "[]int32", "defaultversions": "null0+", "entityType": "brokerId",
                "versionsabout": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 1,
    "about": "The eligible leader replicas of The set of nodes that are in sync with the leader for this partition." } ,
{ "name": "LastKnownELREligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",                "versions": "10+", "nullableVersions": "10+", "taggedVersions": "1+", "tag": 2,
               "about": "The lastnew known eligible leader replicas of this partitionotherwise." }, // New fields end. ] }

BrokerRegistration API (coming with ELR)

{
  "apiKey":62,
  { "name": "LastKnownELR", "type": "request[]int32",
   "listenersdefault": ["controllernull"],
   "nameentityType": "BrokerRegistrationRequestbrokerId",
  "validVersions          "versions": "0-2+",
   "flexibleVersionsnullableVersions": "0+",
  "fields": [
              "about": "The last known ELR." },
{ "name": "BrokerIdLastKnownLeader", "type": "int32", "versionsdefault": "0+null", "entityType": "brokerId",       "about": "The broker ID." },     { "name": "ClusterId", "type "versions": "string0+", "versionsnullableVersions": "0+",        "about": "The clusterlast id of the broker processknown leader." },     
       { "name": "IncarnationIdOfflineReplicas", "type": "uuid[]int32", "versions": "0+",       "about": "The incarnation id of the broker process." },     { "name": "Listeners", "type "ignorable": true, "entityType": "[]ListenerbrokerId",                 "about": "The listenersset of thisoffline broker", "versions": "0+", "fields": [       replicas of this partition." }       ]},
{ "name": "NameTopicAuthorizedOperations", "type": "stringint32", "versions": "0+", "mapKeydefault": true,
        "-2147483648",
"about": "The name of the endpoint32-bit bitfield to represent authorized operations for this topic." } ] }

CleanShutdownFile (Coming with ELR)

{ "BrokerEpoch":"xxx"}

ElectLeadersRequest (Coming with Unclean Recovery)

ACL: CLUSTER_ACTION

Limit: 20 topics per request.

{
  "apiKey": 43,
  ,
      { "name": "Host", "type": "stringrequest",
  "versionslisteners": ["0+zkBroker",
         "aboutbroker":, "The hostname." }controller"],
      {  "name": "PortElectLeadersRequest",
  "typevalidVersions": "uint160-3",
  "versionsflexibleVersions": "02+",
          "aboutfields": "The port." },
      [
  ...
  { "name": "SecurityProtocolTopicPartitions", "type": "int16[]TopicPartitions", "versions": "0+",
         "aboutnullableVersions": "The security protocol." }
    ]
    },
    { "name": "Features", "type": "[]Feature",
      "0+",
    "about": "The topic featurespartitions onto thiselect brokerleaders.",
  "versions": "0+", "fields": [
      
    ...

// New fields begin.
  { "name": "NameDesiredLeaders", "type": "string[]int32", "versions": "03+", "mapKey": true,
        
      "about": "The featuredesired nameleaders." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
       The entry should matches with the entry in Partitions by the index." },
  },
// New fields end.

  ] },
  { "name": "MaxSupportedVersionTimeoutMs", "type": "int16int32", "versions": "0+",
         "default": "60000",
    "about": "The maximum supported feature level time in ms to wait for the election to complete." }
      ] 
    },
    { "name}

GetReplicaLogInfo Request (Coming with Unclean Recovery)

ACL: CLUSTER_ACTION

Limit: 2000 partitions per request.

{
  "apiKey":70,
  "type": "Rackrequest",
  "typelisteners": ["stringbroker"],
  "versionsname": "0+GetReplicaLogInfoRequest",
  "nullableVersionsvalidVersions": "0+",
        "aboutflexibleVersions": "The0+",
 rack which this broker is in." },
    "fields": [
    { "name": "IsMigratingZkBrokerBrokerId", "type": "boolint32", "versions": "10+", "defaultentityType": "falsebrokerId",
       
        "about": "IfThe theID requiredof configurations for ZK migration are present, this value is set to truethe broker." },
//  New fields begin.
    { "name": "PreviousBrokerEpochTopicPartitions", "type": "int64[]TopicPartitions", "versions": "20+", "defaultnullableVersions": "-10+",
          "about": "The epochtopic beforepartitions ato cleanelect shutdownleaders." }
// New fields end.
  ]
}

DescribeTopicRequest (Coming with ELR)

Should be issued by admin clients. The broker will serve this request. As a part of the change, the admin client will start to use DescribeTopicRequest

to query the topic, instead of using the metadata requests.

On the other hand, the TopicPartitionInfo will also updated to include the ELR info.

ACL: Describe Topic

{
  "apiKey":69,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeTopicRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]string", "versions": "0+",
      ",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The topics to fetch details for.", "versions": "0+", "entityType": "topicName"}
]
}

DescribeTopicResponse

partitions of this topic whose leader should be elected." },
    ]}
] }

GetReplicaLogInfo Response

{
  "apiKey":70,
  {
  "apiKey":69,
  "type": "requestresponse",
    "name": "DescribeTopicResponseGetReplicaLogInfoResponse",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "TopicsBrokerEpoch", "type": "[]MetadataResponseTopicint64", "versions": "0+",
       "about": "EachThe topicepoch infor the responsebroker.", "fields": [
       }
    { "name": "ErrorCodeLogInfoList", "type": "int16[]LogInfo", "versions": "0+", 
            "about": "The topiclist error,of orthe 0 if there was no error." },
      log info.",
    "fields": [
      { "name": "NameTopicId", "type": "stringuuid", "versions": "0+", "mapKeyignorable": true, "entityType": "topicName", "nullableVersions": "0+",
        "about": "The unique topic nameID." },
            { "name": "TopicIdPartition", "type": "uuidint32", "versions": "0+", "ignorableabout": true, "about": "The topic id"The id for the partition." },
            { "name": "IsInternalLastWrittenLeaderEpoch", "type": "boolint32", "versions": "0+", "defaultabout": "false", "ignorable": true,
        "about": "True if the topic is internalThe last written leader epoch in the log." },
      { "name": "PartitionsCurrentLeaderEpoch", "type": "[]MetadataResponsePartitionint32", "versions": "0+",
         "about": "EachThe current leader epoch for the partition infrom the topic broker point of view." }, "fields": [
        
      { "name": "ErrorCodeLogEndOffset", "type": "int16int64", "versions": "0+",
           "about": "The partitionlog error,end oroffset 0for if there was no errorthe partition." },
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the leader broker." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The leader epoch of this partition." },
        { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
          "about": "The set of all nodes that host this partition." },
        { "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
          "about": "The set of nodes that are in sync with the leader for this partition." },
]} ] }

kafka-leader-election.sh (Coming with Unclean Recovery)

...
// Updated field starts.
--election-type <[PREFERRED, UNCLEAN, LONGEST_LOG_PROACTIVE, LONGEST_LOG_BALANCED, DESIGNATION]:                
                                          Type of election to attempt. Possible
  election type>                {  "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
         values "versions":are "0+preferred", "nullableVersions": "0+",
          "about": "null if the ELR didn't change; the new eligible leader replicas otherwise." },
for preferred
                        { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
          "versions": "0+", "nullableVersions": "0+",leader election, or "unclean" for
          "about": "null if the LastKnownELR didn't change. Otherwise, the last known ELR." },
       { "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", "ignorable": true, "entityType": "brokerId",           "about": "The set of offline replicas ofa thisrandom partition." }       ]},
{ "name": "TopicAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
unclean leader election, "about": "32-bit bitfield to represent authorized operations for this topic." } ] }

CleanShutdownFile (Coming with ELR)

{ "BrokerEpoch":"xxx"}

ElectLeadersRequest (Coming with Unclean Recovery)

{
  "apiKey": 43,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "ElectLeadersRequest",
  "validVersions": "0-3",
  "flexibleVersions": "2+",
  "fields": [
  { "name": "ElectionType", "type": "int8", "versions": "1+",
    "about": "Type of elections to conduct for the partition. A value of '0' elects the preferred replica. A value of '1' elects the desired replicas.
    A value of '2' elects the replica with the longest log among the last known ELRs. A value of '3' elects the replica with the longest log among all the replicas that can reply within a fixed time." },
  { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
    "about": "The topic partitions to elect leaders.",
    "fields": [
  { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
    "about": "The name of a topic." },
  { "name": "Partitions", "type": "[]int32", "versions": "0+",
    "about": "The partitions of this topic whose leader should be elected." },

// New fields begin.
  { "name": "DesiredLeaders", "type": "[]int32", "versions": "3+",
      "about": "The desired leaders. The entry should matches with the entry in Partitions by the index." },
  },
// New fields end.

  ] },
  { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
    "about": "The time in ms to wait for the election to complete." }
  ] 
}

GetReplicaLogInfo Request (Coming with Unclean Recovery)

ACL: CLUSTER_ACTION

{
  "apiKey":70,
  "type": "request",
  "listeners": ["broker"],
  "name": "GetReplicaLogInfoRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", 
        "about": "The ID of the broker." },
    { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
    "about": "The topic partitions to elect leaders.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of this topic whose leader should be elected." },
    ]}
] }

GetReplicaLogInfo Response

{
  "apiKey":70,
  "type": "response",
  "name": "GetReplicaLogInfoResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "about": "The epoch for the broker." }
    { "name": "LogInfoList", "type": "[]LogInfo", "versions": "0+", 
    "about": "The list of the log info.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partition", "type": "int32", "versions": "0+", "about": "The id for the partition." },
      { "name": "LastWrittenLeaderEpoch", "type": "int32", "versions": "0+", "about": "The last written leader epoch in the log." },
      { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "about": "The current leader epoch for the partition from the broker point of view." },
      { "name": "LogEndOffset", "type": "int64", "versions": "0+", "about": "The log end offset for the partition." }
    ]}
] }

kafka-leader-election.sh (Coming with Unclean Recovery)

--admin.config <String: config file>    Configuration properties files to pass
     or "longest_log_proactive"/"longest_log_balanced"
                                          to choose the replica 
                                          with the longest log or "designation" for
                                          electing the given replica to be the leader. If
                                          preferred election is selection, the
                                          election is only performed if the
                                          current leader is not the preferred
                                          leader for the topic partition. If
                                          longest_log_proactive/longest_log_balanced/designation 
                                          election is selected, the
                                          election is only performed if there
                                          are no leader for the topic
                                          partition. REQUIRED.                                      
--path-to-json-file <String: Path to    The JSON file towith the adminlist  client
--all-topic-partitionsof
  JSON file>               Perform election on all of the
          partition for which leader elections
                            eligible topic partitions based on
          should be performed. This is an
                           the type of election (see the --
         example format. The desiredLeader field
                             election-type flag). Not allowed if
         is only required in DESIGNATION  election.
                           --topic or --path-to-json-file is
             
                             specified.
--bootstrap-server <String: host:port>  A hostname and port for the broker to{"partitions":
                                        	[{"topic":  connect to, in the form host:port."foo", "partition": 1, "desiredLeader": 0},
                                        	  Multiple comma separated URLs can be{"topic": "foobar", "partition": 2, "desiredLeader": 1}]
                                          given. REQUIRED.
// Updated field starts.
--election-type <[PREFERRED, UNCLEAN, LONGEST_LOG_PROACTIVE, LONGEST_LOG_BALANCED, DESIGNATION]:}
                     
                   Not allowed if --all-topic-partitions
                    Type of election to attempt. Possible
  election type>                          values are "preferred" for preferred
                                          leader election, or "unclean" for
                                          a random unclean leader election,
                                          or "longest_log_proactive"/"longest_log_balanced"
                                          to choose the replica 
                                          with the longest log or "designation" for
                                          electing the given replica to be the leader. If
                                          preferred election is selection, the
                                          election is only performed if the
                                          current leader is not the preferred
                                          leader for the topic partition. If
                                          longest_log_proactive/longest_log_balanced/designation 
                                          election is selected, the
                                          election is only performed if there
                                          are no leader for the topic
                                          partition. REQUIRED.
// Updated field ends.
--help                                  Print usage information.
--partition <Integer: partition id>     Partition id for which to perform an
                                          election. REQUIRED if --topic is
                                          specified.
                                          
// Updated field starts.                                        
--path-to-json-file <String: Path to    The JSON file with the list  of
  JSON file>                              partition for which leader elections
                                          should be performed. This is an
                                          example format. The desiredLeader field
                                          is only required in DESIGNATION election.
                                        
                                        {"partitions":
                                        	[{"topic": "foo", "partition": 1, "desiredLeader": 0},
                                        	 {"topic": "foobar", "partition": 2, "desiredLeader": 1}]
                                        }
                                        Not allowed if --all-topic-partitions
                                          or --topic flags are specified.
// Updated field ends.

--topic <String: topic name>            Name of topic for which to perform an
                                          election. Not allowed if --path-to-
                                          json-file or --all-topic-partitions
                                          is specified.
--version                               Display Kafka version.

Metrics

The following gauge metrics will be added for Unclean Recovery

  • kafka.replication.unclean_recovery_partitions_count. It counts the partitions that are under unclean recovery. It will be unset/set to 0 when there is no unclean recovery happening. Note, if in Balance mode, the members in LastKnownELR are not all unfenced, it is also counted as a live recovery.
  • kafka.replication.manual_leader_election_required_partition_count. It counts the partition that is leaderless and waits for user operations to find the next leader.

Public-Facing Changes

  1. The High Watermark will only advance if all the messages below it have been replicated to at least min.insync.replicas ISR members.

  2. The consumer is affected when consuming the ack=1 and ack=0 messages. When there is only 1 replica(min ISR=2), the HWM advance is blocked, so the incoming ack=0/1 messages are not visible to the consumers. Users can avoid the side effect by updating the min.insync.replicas to 1 for their ack=0/1 topics.

  3. Compared to the current model, the proposed design has availability trade-offs:

    1. If the network partitioning only affects the heartbeats between a follower and the controller, the controller will kick it out of ISR. If losing this replica makes the ISR under min ISR, the HWM advancement will be blocked unnecessarily because we require the ISR to have at least min ISR members. However, it is not a regression compared to the current system at this point. But later when the network partitioning finishes, the current leader will put the follower into the pending ISR(aka "maximum ISR") and continue moving forward while in the proposed world, the leader needs to wait for the controller to ack the ISR change.

    2. Electing a leader from ELR may mean choosing a degraded broker. Degraded means the broker can have a poor performance in replication due to common reasons like networking or disk IO, but it is alive. It can also be the reason why it fails out of ISR in the first place. This is a trade-off between availability and durability.

  4. The unclean leader election will be replaced by the unclean recovery.

  5. For fsync users, the ELR can be beneficial to have more choices when the last known leader is fenced. It is worth mentioning what to expect when ISR and ELR are both empty. We assume fsync users adopt the unclean.leader.election.enable as false.
    1. If the KIP has been fully implemented. The unclean.recovery.strategy will be balanced. During the unclean recovery, the URM will elect a leader when all the LastKnownElr members have replied.
    2. If only the ELR or Unclean recovery is implemented, the LastKnownLeader is preferred.

Ack=0/1 comparison

Comparing the current ISR model with the proposed design

...

Current

...

Proposed

...

Produce

...

  • Ack=0. No ack is required.

  • Ack=1. Ack after writing to the local log

...

The same. 

...

Consumer

...

Clients can consume incoming messages if the ISR size is below min ISR.

...

Clients can't consume new messages if the ISR size is below min ISR.

...

Replication

...

HWM will move forward even if the ISR size is below min ISR.

...

HWM can not move forward if the ISR size is below min ISR.

...

Recover when

all replicas have

been fenced

...

  1. An unclean leader election is required if the last member in ISR can't come back.

  2. Only the last ISR member can be elected as the leader.

  3. The system will take all the data loss that the last ISR member has.

...

  1. An unclean recovery is required if no suitable replica can be elected.

  2. The member of ELR can be elected as the leader. No guarantee on the ack=0/1 messages but it will be the possible minimal.

Compatibility, Deprecation, and Migration Plan

High Watermark advance requirement

...

min.insync.replicas will no longer be effective to be larger than the replication factor. For existing configs, the min.insync.replicas will be min(min.insync.replicas, replication factor).

    or --topic flags are specified.
// Updated field ends.

Metrics

The following metrics will be added for ELR

  • kafka.controller.global_under_min_isr_partition_count. Gauge. It tracks the number of partitions with ISR size smaller than min ISR.

The following metrics will be added for Unclean Recovery

  • kafka.controller.unclean_recovery_partitions_count. Gauge. It tracks the partitions that are under unclean recovery. It will be unset/set to 0 when there is no unclean recovery happening. Note, if in Balance mode, the members in LastKnownELR are not all unfenced, it is also counted as a live recovery.
  • kafka.controller.manual_leader_election_required_partition_count. Gauge. It counts the partition that is leaderless and waits for user operations to find the next leader.
  • kafka.controller.unclean_recovery_finished_count. Counter. It counts how many unclean recovery has been done. It will be set to 0 once the controller restarts.


Public-Facing Changes

  1. The High Watermark will only advance if all the messages below it have been replicated to at least min.insync.replicas ISR members.

  2. The consumer is affected when consuming the ack=1 and ack=0 messages. When there is only 1 replica(min ISR=2), the HWM advance is blocked, so the incoming ack=0/1 messages are not visible to the consumers. Users can avoid the side effect by updating the min.insync.replicas to 1 for their ack=0/1 topics.

  3. Compared to the current model, the proposed design has availability trade-offs:

    1. If the network partitioning only affects the heartbeats between a follower and the controller, the controller will kick it out of ISR. If losing this replica makes the ISR under min ISR, the HWM advancement will be blocked unnecessarily because we require the ISR to have at least min ISR members. However, it is not a regression compared to the current system at this point. But later when the network partitioning finishes, the current leader will put the follower into the pending ISR(aka "maximum ISR") and continue moving forward while in the proposed world, the leader needs to wait for the controller to ack the ISR change.

    2. Electing a leader from ELR may mean choosing a degraded broker. Degraded means the broker can have a poor performance in replication due to common reasons like networking or disk IO, but it is alive. It can also be the reason why it fails out of ISR in the first place. This is a trade-off between availability and durability.

  4. The unclean leader election will be replaced by the unclean recovery.

  5. For fsync users, the ELR can be beneficial to have more choices when the last known leader is fenced. It is worth mentioning what to expect when ISR and ELR are both empty. We assume fsync users adopt the unclean.leader.election.enable as false.
    1. If the KIP has been fully implemented. The unclean.recovery.strategy will be balanced. During the unclean recovery, the controller will elect a leader when all the LastKnownElr members have replied.
    2. If only the ELR is implemented, the LastKnownLeader is preferred when ELR and ISR are both empty.

Ack=0/1 comparison

Comparing the current ISR model with the proposed design


Current

Proposed

Produce

  • Ack=0. No ack is required.

  • Ack=1. Ack after writing to the local log

The same. 

Consumer

Clients can consume incoming messages if the ISR size is below min ISR.

Clients can't consume new messages if the ISR size is below min ISR.

Replication

HWM will move forward even if the ISR size is below min ISR.

HWM can not move forward if the ISR size is below min ISR.

Recover when

all replicas have

been fenced

  1. An unclean leader election is required if the last member in ISR can't come back.

  2. Only the last ISR member can be elected as the leader.

  3. The system will take all the data loss that the last ISR member has.

  1. An unclean recovery is required if no suitable replica can be elected.

  2. The member of ELR can be elected as the leader. No guarantee on the ack=0/1 messages but it will be the possible minimal.

Compatibility, Deprecation, and Migration Plan

High Watermark advance requirement

  • min.insync.replicas will no longer be effective to be larger than the replication factor. For existing configs, the min.insync.replicas will be min(min.insync.replicas, replication factor).

  • Cluster admin should update the min.insync.replicas to 1if they want to have the replication going when there is only the leader in the ISR.

ELR

It will be guarded by a new metadata version. So it is not enabled during the rolling upgrade.

After the controller picked up the new MV and loads the partition states, it will populate the ELR as empty if the PartitionChangeRecord uses an old version. In the next partition update, the controller will record the current ELR. 

Note, the leader can only enable empty ISR after the new metadata version.

Clean shutdown

It will be guarded by a new metadata version. Before that, the controller will treat all the registrations with unclean shutdowns.

Unclean recovery

Unclean Recovery is guarded by the feature flag unclean.recovery.Enabled. unclean.leader.election.enable and unclean.recovery.strategyare automated converted.

Delivery plan

The KIP is a large plan, it can be across multiple quarters. So we have to consider how to deliver the project in phases.

The ELR will be delivered in the first phase. When the Unclean Recover has not shipped or it is disabled:

The main difference is in the leader election and the unclean leader election.

  • If there are other ISR members, elect one of them.
  • If there are other unfenced ELR members, elect one of them.
  • If there are fenced ELR members
    • unclean.leader.election.enable false, then elect the first ELR member to be unfenced.
    • unclean.leader.election.enable true, start an unclean leader election.
  • If there are no ELR members
    • unclean.leader.election.enable true, start an unclean leader election.
    • unclean.leader.election.enable false, then elect the LastKnownLeader once it is unfenced.

The unclean leader election will be randomly choosing an unfenced replica as it is today

...

ELR

It will be guarded by a new metadata version. So it is not enabled during the rolling upgrade.

After the controller picked up the new MV and loads the partition states, it will populate the ELR as empty if the PartitionChangeRecord uses an old version. In the next partition update, the controller will record the current ELR. 

Note, the leader can only enable empty ISR after the new metadata version.

Clean shutdown

It will be guarded by a new metadata version. Before that, the controller will treat all the registrations with unclean shutdowns.

Unclean recovery

  1. For the existing unclean.leader.election.enable

    1. If true, unclean.recovery.strategy will be set to Proactive.

    2. If false, unclean.recovery.strategy will be set to Balanced.

  2. unclean.recovery.strategy is guarded by the metadata version. Ideally, it should be enabled with the same MV with the ELR change.

  3. The unclean leader election behavior is kept before the MV upgrade.

  4. Once the unclean recovery is enabled, the MV is not downloadable. 

Delivery plan

The KIP is a large plan, it can be across multiple quarters. So we have to consider how to deliver the project in phases.

As the Unclean Recovery can be developed in parallel with the ELR, let's discuss what would be the expected behavior if only one of them has delivered.

...

  • The unclean leader election will remain the same as the current. No change to the unclean.leader.election.enable and the behavior is random select an unfenced replica as the leader.
  • Leader election will be different when ISR and ELR are both empty. In this case, we try to maintain the "last known leader" behavior. Basically, when the last leader gets fenced, the LastKnownELR field will be also updated. The last leader will be put at the front of the LastKnownELR list. Then if the last leader can be unfenced, it will be elected as the leader. In this way, if only ELR is delivered, there is no regression in availability.
    • In summary, if unclean.leader.election.enable is false and the ELR is empty, the controller will elect the first replica in the LastKnownELR to be the leader when it is unfenced. If this replica can't be unfenced, then the controller will keep waiting.

...

.

Test Plan

The typical suite of unit/integration/system tests. The system test will verify the behaviors of different failing scenarios.

...

It is an option to enhance the ListOffsets API instead of creating a new API. The intention of the new API is to distinguish the admin API and the client API. To be specific, It is necessary to include the broker epoch in the recovery, but this piece of information does not necessarily to be exposed to the clients.

...

The controller elects the leader when a given number of replicas have replied

In Balance mode, instead of waiting for the last-known ELR members, the URM controller elects when receives a given number of responses. For example, if the replication factor is 3 and min-ISR is 2. If we require at least 2 responses, then URM controller will choose between the first 2 replicas.

However, it does not give enough protection if we don’t require responses from all the replicas. Consider the following case. The ISR starts with [0,1,2]. Broker 2 falls behind and is kicked out of ISR. Then broker 1, and broker 0 suffers unclean shutdowns and broker 1 had a real data loss. Later, broker 2 and broker 1 come online and URM controller will choose between 1 and 2. Either option will have data loss.

...