Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

A partition replica can experience local data loss in unclean shutdown scenarios where unflushed data in the OS page cache is lost - such as an availability zone power outage or a server error. The Kafka replication protocol is designed to handle these situations by removing such replicas from the ISR and only re-adding them once they have caught up and therefore recovered any lost data. This prevents replicas that lost an arbitrary log suffix, which included committed data, from being elected leader.

However, there is a "last replica standing" state which when combined with a data loss unclean shutdown event can turn a local data loss scenario into a global data loss scenario, i.e., committed data can be removed from all replicas. When the last replica in the ISR experiences an unclean shutdown and loses committed data, it will be reelected leader after starting up again, causing rejoining followers to truncate their logs and thereby removing the last copies of the committed records which the leader lost initially. 

This proposal solves this "last replica standing" data loss issue in KRaft clusters, providing MinISR-1 tolerance to data loss unclean shutdown events.

Consider the following "last replica standing" scenario with a partition with 3 healthy replicas in the ISR(0,1,2) and the min.ISR is 2.

  • At T0, a network partitioning happens and broker 0 gets out of ISR.

  • At T1, another network partitioning happens, and broker 1 also leaves. Broker 2 becomes the leader of this partition.

  • At T2, broker 2 suffers an unclean shutdown which also causes broker 2 to lose some of its logs. The current kafka behavior will prevent ISR drops to empty which keeps the last replica broker 2. Also, it puts this partition to no leader state.

  • At T3, the network partitioning is done. Broker 0 and broker 1 come back. However, the ISR can't be recovered because these two brokers are not in ISR.

  • At T4, broker 2 restarts and becomes the leader. Then, the replication begins and results in global data loss.

Proposed Changes

With ZK marked deprecated in AK 3.5, only the fix in KRaft is in scope. 

Additional High Watermark advance requirement

A bit recap of some key concepts.

  • High Watermark.

    • In ISR, each server maintains a high watermark, which represents the highest offset of the replicated log known to be committed / durably stored.

    • Also, for consumers, only the message above the High Watermark is visible to them.

  • Ack=1/all produce request. It defines when the Kafka server should respond to the produce request.

    • For ack=1 requests, the server should respond when the message is persisted in the leader’s local log.

    • For ack=all requests, the server should respond when the message is persisted in all the ISR members' local log and the size of the ISR member is larger than min ISR.

While we are seeking a solution to enhance the durability of ack=all messages, we are encountering difficulties in avoiding interference with the ack=1 messages. In the scenario raised in the motivation section, the server may receive ack=1 messages during T1 and advance High Watermark when the leader is the only one in ISR. Then, if we elect broker 1 as the leader at T4, though we can guarantee the safety of ack=all messages, the High Watermark may move backward which causes further impacts on the consumers.

To avoid the ack=1 message interference, we propose to enforce that High Watermark can only advance if the ISR size is larger or equal to min.insync.replicas. Here are some clarifications:

  • It applies to the ack=1 message replication as well. Note that the leader still acknowledges the client requests when the ack=1 messages have persisted in the leader log.

  • The ISR membership refers to the latest ISR membership persisted by the controller, not the "maximal ISR" which is defined by the leader that includes the current ISR members and pending-to-add replicas that have not yet been committed to the controller.

  • Note that, if maximal ISR > ISR, the message should be replicated to the maximal ISR before covering the message under HWM. The proposal does not change this behavior.

As a side effect of the new requirement:

  • The current Kafka cluster does allow the following topic creation configs:

    • min.insync.replicas > replication factor

    • min.insync.replicas > current cluster size

With the proposal, the ack=1 requests will all be acknowledged by the server with the above config, however, no messages can be visible to the clients. For backward compatibility, the effective min.insync.replicas will be min(min.insync.replicas, replication factor).

Eligible Leader Replicas

Our ultimate goal is to elect a leader without data loss or HWM moving backward when we only have min ISR - 1 unclean shutdowns. Before we introduce the new mechanism, let's recap the ISR.

The current ISR primarily serves two purposes.

  1. It acts as a quorum for replication. The High Watermark is utilized to indicate the lower bound of the log offset that all ISR replicas have replicated.

  2. It functions as a candidate set for the leader. In the case of produce requests with ack=all, the leader will commit to the request only when the message has been replicated to the entire ISR. Thus, the controller ensures data safety by selecting any broker within the ISR as the leader.

Typically, the second function can be inferred from the first one. However, the current Kafka server promises that the server will commit to ack=all messages only if the ISR size is at least min.insync.replicas. Also, as we are adding the new HWM requirement to avoid the HWM moving backward, this set of "min ISR" rules establishes ISR as a sufficient but not necessary condition for leader election. Consequently, in the background scenario, we can ensure the durability of the ack=all messages if we are somehow able to elect the out-of-ISR member broker 1.

Therefore, we propose to separate the functions of the original ISR.

  • The ISR will still continue to serve its replication function. The High Watermark forwarding still requires a quorum within the ISR. This ensures that replication between brokers remains unchanged. 

  • To handle leader elections, we will introduce a concept called Eligible Leader Replicas (ELR). In addition to the ISR members, replicas in ELR are also eligible for leader election during a clean election process.

At a high level, we use ELR to store the replicas that are not in ISR but guarantee to have the data at least to High Watermark.

ISR invariants:

  • The ISR can be empty now. The proposal maintains the behavior of removing a replica out of ISR if it is lagging from the ISR or it is fenced by the controller.

ELR invariants:

  • The member of ELR should not be in ISR.

  • The member of ELR should have the data at least to HWM.

  • The member of ELR can lag in replication or in an unknown status from the controller's perspective(fenced).

  • If ELR is not empty, the ISR is under min ISR.

  • ELR + ISR size will not be dropped below the min ISR unless the controller discovers an ELR member has an unclean shutdown.

    • The controller will remove the ELR member if it registers with an unclean shutdown.

    • The unclean shutdown detection is discussed in another section below.

Broker behaviors:

Both the follower and the leader don’t have any new behavior to handle ELR. They still refer to ISR for decision making.

Controller behaviors:

ELR will be maintained purely on the controller side in the partition state. There are 4 ways to interact with the ELR:

  1. AlterPartition request. Although brokers are not explicitly aware of the ELR, they can indirectly modify it through the AlterPartition request. When the controller receives the new ISR, it will trigger an ELR update.

  2. The replica gets fenced. When it happens, the controller will trigger the ELR update with the new updated ISR.

  3. The replica gets unfenced. If the replica is an ELR member and ISR is empty, this replica will be elected as leader, added to ISR, and removed from ELR.

  4. During the broker registration, if the broker had an unclean shutdown, the controller will remove the broker from ISR and ELR before persisting the registration record.

ELR update will take a proposed ISR and the controller does the following:

  1. When the proposed ISR is larger or equal to min ISR, the controller will update the ISR and empty the ELR.

  2. When the proposed ISR is smaller than min ISR, the controller will

    1. retain the current members of the ELR.

    2. add (the current ISR - the proposed ISR) to ELR.

    3. remove the duplicate member in both ISR and ELR from ELR.

The high-level guide and the reasoning behind the above update rules are that ELR will only exist when the ISR is below min ISR. At this moment, the HWM will not advance. Also, only the member in the last ISR snapshot when the ISR drops below min ISR can join the ELR which indicates the ELR member has the logs at least to the HWM.

Here is an example that demonstrates most of the above ELR behaviors. The setup is 4 brokers with min ISR 3.

Image Modified

A common question is whether we could advance HWM when we have an ELR member (not replicating from leader), thus violating the invariant that every ELR member has data at least up to HWM. Consider the following example of a 3 replicas partition with min ISR=2:

  1. ISR=[0], ELR=[1], broker 0 is the leader. Broker 1 is up but doesn’t have network connectivity.

  2. Broker 2 comes up catches up and the leader wants to add it to ISR. At this point ISR=[0], but Maximal ISR=[0, 2].

  3. Currently, we would advance HWM because it replicated to 2 brokers (the ones in Maximal ISR), but in the new protocol we wait until the controller updates ISR=[0,2] to avoid advancing HWM beyond what ELR=[1] has.


Other behaviors:

  1. Change of the ELR does not require a leader epoch bump. In most cases, the ELR updates along with the ISR changes. The only case of the ELR changes alone is when an ELR broker registers after an unclean shutdown. In this case, no need to bump the leader epoch.

  2. When updating the config min.insync.replicas, if the new min ISR <= current ISR, the ELR will be removed.

  3. A new metric of Electable leaders will be added. It reflects the count of (ISR + ELR).

  4. The AlterPartitionReassignments will not change the ELR. The leader updates the ISR implicitly later with AlterPartition requests. 

  5. Have a new admin API  DescribeTopicRequest for showing the topic details. We don't want to embed the ELR info in the Metadata API. The ELR is not some necessary details to be exposed to user clients.

  6. We also record the last-known ELR members.

    1. It basically means when an ELR member has an unclean shutdown, it will be removed from ELR and added to the LastKnownELR. The LastKnownELR will be cleaned when ISR reaches the min ISR.

    2. LastKnownELR is stored in the metadata log.

    3. LastKnownELR will be useful in the Unclean Recovery section.


Leader election

As the proposal changes a lot in our behaviors about the ISR, the leader election behavior will be described in detail in the Unclean Recovery section.

Detection of an unclean shutdown

The current log system will create a CleanShutdownFile after the log has flushed and right before shutdown. Then if the broker comes up again and finds this CleanShutdownFile, the broker can assume the log is complete after the reboot.

Based on CleanShutdownFile, we propose the following new behaviors.

  1. During the shutdown, write the current broker epoch in the CleanShutdownFile.

  2. During the start, the broker will try to read the broker epoch from the CleanShutdownFile. Then put this broker epoch in the broker registration request.

  3. The controller will verify the broker epoch in the request with its registration record. If it is the same, it is a clean shutdown.

Unclean recovery

As the new proposal allows the ISR to be empty, the leader election strategy has to be reviewed.

  • unclean.leader.election.enable=true, the controller will randomly elect a leader if the last ISR member gets fenced.

  • unclean.leader.election.enable=false, the controller will only elect the last ISR member when it gets unfenced again.

The above “Last Leader” behavior can’t be maintained with an empty ISR and it should be removed. Also, randomly electing a leader is definitely worth improving. As a result, we decide to enhance the unclean leader election and update the unclean leader election config to an intent-based config.

unclean.recovery.strategy has the following 3 options.

Proactive. It represents the intent of recovering the availability as fast as possible.
Balanced. Auto recovery on potential data loss case, wait as needed for a better result.
Manual. Stop the partition on potential data loss.

With the new config, the leader election decision will be made in the following order when the current leader is fenced:

  1. If there are other ISR members, choose an ISR member.

  2. If there are unfenced ELR members, choose an ELR member.

  3. If there are fenced ELR members

    1. If the unclean.recovery.strategy=Proactive, then an unclean recovery will happen.

    2. Otherwise, we will wait for the fenced ELR members to be unfenced.

  4. If there are no ELR members.

    1. If the unclean.recovery.strategy=balanced, the controller will do the unclean recovery.

    2. Otherwise, unclean.recovery.strategy=Manual, the controller will not attempt to elect a leader. Waiting for the user operations.

Note that, the unclean.recovery.strategy will be a topic-level config.

In order to support the unclean recovery, introduce a new component in the controller called Unclean Recovery Manager.

Unclean Recovery Manager(URM)

The URM manages the recovery process for a leaderless partition. This new unclean recovery process takes the place of the unclean leader election. Instead of electing a random unfenced replica as the leader, the URM will query the log end offset and the leader epoch from each replica. The one with the highest leader epoch and the longest log end offset will be the new leader.

The controller will trigger the unclean recovery when the leader is fenced and there is no other good candidate.

Workflow

The URM takes the partition info to initiate an unclean recovery task. The fields used in the recovery:

  • Topic and partition id

  • Replica IDs

Next, the URM will initiate the log query requests with a new component BrokerRequestSender(BRS) which handles the RPC request asynchronously. Then the query requests will be sent in a new GetReplicaLogInfo API. The response should include the following information for each partition:

  • Topic and partition id

  • Log end offset

  • Partition leader epoch in the log

  • Broker epoch

  • Current partition leader epoch in the metadata cache.

Once the GetReplicaLogInfo is received, the response will not be directly passed back to the URM, instead, BRS will parse the response as a controller event and put it in the event queue. Later URM can consume the events. This behavior minimizes the change to the controller's single-threaded structure.

The URM will verify the GetReplicaLogInfo response in the following ways:

  1. Reject the response if the broker epoch mismatch. This can avoid electing a broker that has rebooted after it makes the response.

  2. Reject the response if the partition leader epoch in the metadata cache mismatches with the partition leader epoch on the controller side. This fences stale GetReplicaLogInfo responses.

  3. Note if the response is rejected and the leader has not been elected, URM will initiate the log query again.

After the verification, the URM will trigger the election when

  1. In Balance mode, all the LastKnownELR members have replied.

  2. In Proactive mode, any replicas replied within a fixed amount of time OR the first response received after the timeout. We don’t want to make a separate config for it so just make the fixed time of 5 seconds.

Then during the election, URM first filters the replicas with the highest partition leader epoch, then it elects the one with the longest log end offset as the new leader.

An ideal workflow

Image Modified

Failovers

  • Broker failover.

    • If the replica fails before it receives the GetReplicaLogInfo request, it can just send the log info along with its current broker epoch.

    • If the replica fails after it responds to the GetReplicaLogInfo request

      • If the controller received the new broker registration, the URM can reject the response because the broker epoch in the request mismatches with the broker registration.

      • Otherwise, the replica may become the leader but will be fenced later when it registers.

  • Controller failover.

    • The URM does not store anything in the metadata log, every controller failover will result in a new unclean recovery.

...

Public Interfaces

PartitionChangeRecord

{
  "apiKey": 5,
  "type": "metadata",
  "name": "PartitionChangeRecord",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Isr", "type":  "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0,
      "about": "null if the ISR didn't change; the new in-sync replicas otherwise." },
    { "name": "Leader", "type": "int32", "default": "-2", "entityType": "brokerId",
      "versions": "0+", "taggedVersions": "0+", "tag": 1,
      "about": "-1 if there is now no leader; -2 if the leader didn't change; the new leader otherwise." },
    { "name": "Replicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 2,
      "about": "null if the replicas didn't change; the new replicas otherwise." },
    { "name": "RemovingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 3,
      "about": "null if the removing replicas didn't change; the new removing replicas otherwise." },
    { "name": "AddingReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 4,
      "about": "null if the adding replicas didn't change; the new adding replicas otherwise." },
    { "name": "LeaderRecoveryState", "type": "int8", "default": "-1", "versions": "0+", "taggedVersions": "0+", "tag": 5,
      "about": "-1 if it didn't change; 0 if the leader was elected from the ISR or recovered from an unclean election; 1 if the leader that was elected using unclean leader election and it is still recovering." },
// New fields begin.
    { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 6,
      "about": "null if the ELR didn't change; the new eligible leader replicas otherwise." }
    { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 7,
      "about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." }
// New fields end.
  ]
}

BrokerRegistration API

{
  "apiKey":62,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerRegistrationRequest",
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID." },
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "about": "The cluster id of the broker process." },
    { "name": "IncarnationId", "type": "uuid", "versions": "0+",
      "about": "The incarnation id of the broker process." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname." },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port." },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol." }
    ]
    },
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported feature level." }
    ]
    },
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is in." },
    { "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
      "about": "If the required configurations for ZK migration are present, this value is set to true" },
// New fields begin.
    { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
      "about": "The broker's assigned epoch or the epoch before a clean shutdown." }
// New fields end.
  ]
}

DescribeTopicRequest

{
  "apiKey":69,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "DescribeTopicRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]DescribeTopicResponseTopic", "versions": "0+",
      "about": "The topics to fetch details for.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." },
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "nullableVersions": "0+",
        "about": "The topic name." }
    ]}
]
}

DescribeTopicResponse

{
  "apiKey":69,
  "type": "request",
  "name": "DescribeTopicResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]MetadataResponseTopic", "versions": "0+",
      "about": "Each topic in the response.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The topic error, or 0 if there was no error." },
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "0+",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." },
      { "name": "IsInternal", "type": "bool", "versions": "0+", "default": "false", "ignorable": true,
        "about": "True if the topic is internal." },
      { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
        "versions": "0+", "nullableVersions": "0+",
        "about": "null if the ELR didn't change; the new eligible leader replicas otherwise." },
      { "name": "LastKnownLeader", "type": "int32", "default": "null", "entityType": "brokerId",
        "versions": "0+", "nullableVersions": "0+",
        "about": "The last leader before the partition becomes leaderless." },
      { "name": "Partitions", "type": "[]MetadataResponsePartition", "versions": "0+",
        "about": "Each partition in the topic.", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The partition error, or 0 if there was no error." },
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the leader broker." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The leader epoch of this partition." },
        { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
          "about": "The set of all nodes that host this partition." },
        { "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
          "about": "The set of nodes that are in sync with the leader for this partition." },
        { "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", "ignorable": true, "entityType": "brokerId",
          "about": "The set of offline replicas of this partition." }
      ]}
]
}

CleanShutdownFile

{ "BrokerEpoch":"xxx"}

ElectLeadersRequest

{
  "apiKey": 43,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "ElectLeadersRequest",
  "validVersions": "0-3",
  "flexibleVersions": "2+",
  "fields": [
  { "name": "ElectionType", "type": "int8", "versions": "1+",
    "about": "Type of elections to conduct for the partition. A value of '0' elects the preferred replica. A value of '1' elects the desired replicas.
    A value of '2' elects the replica with the longest log among the last known ELRs. A value of '3' elects the replica with the longest log among all the replicas that can reply within a fixed time." },
  { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
    "about": "The topic partitions to elect leaders.",
    "fields": [
  { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
    "about": "The name of a topic." },
  { "name": "Partitions", "type": "[]int32", "versions": "0+",
    "about": "The partitions of this topic whose leader should be elected." },

// New fields begin.
  { "name": "DesiredLeaders", "type": "[]int32", "versions": "3+",
      "about": "The desired leaders. The entry should matches with the entry in Partitions by the index." },
  },
// New fields end.

  ] },
  { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
    "about": "The time in ms to wait for the election to complete." }
  ] 
}

GetReplicaLogInfo Request

{
  "apiKey":70,
  "type": "request",
  "listeners": ["broker"],
  "name": "GetReplicaLogInfoRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", 
        "about": "The ID of the broker." },
    { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
    "about": "The topic partitions to elect leaders.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of this topic whose leader should be elected." },
    ]}
] }

GetReplicaLogInfo Response

{
  "apiKey":70,
  "type": "response",
  "name": "GetReplicaLogInfoResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "about": "The epoch for the broker." }
    { "name": "LogInfoList", "type": "[]LogInfo", "versions": "0+", 
    "about": "The list of the log info.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partition", "type": "int32", "versions": "0+", "about": "The id for the partition." },
      { "name": "LastWrittenLeaderEpoch", "type": "int32", "versions": "0+", "about": "The last written leader epoch in the log." },
      { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "about": "The current leader epoch for the partition from the broker point of view." },
      { "name": "LogEndOffset", "type": "int64", "versions": "0+", "about": "The log end offset for the partition." }
    ]}
] }

kafka-leader-election.sh

--admin.config <String: config file>    Configuration properties files to pass
                                          to the admin client
--all-topic-partitions                  Perform election on all of the
                                          eligible topic partitions based on
                                          the type of election (see the --
                                          election-type flag). Not allowed if
                                          --topic or --path-to-json-file is
                                          specified.
--bootstrap-server <String: host:port>  A hostname and port for the broker to
                                          connect to, in the form host:port.
                                          Multiple comma separated URLs can be
                                          given. REQUIRED.
// Updated field starts.
--election-type <[PREFERRED, LONGEST_LOG_PROACTIVE, LONGEST_LOG_BALANCED, DESIGNATION]:                
                                          Type of election to attempt. Possible
  election type>                          values are "preferred" for preferred
                                          leader election, or "unclean" for
                                          a random unclean leader election,
                                          or "longest_log_proactive"/"longest_log_balanced"
                                          to choose the replica 
                                          with the longest log or "designation" for
                                          electing the given replica. If
                                          preferred election is selection, the
                                          election is only performed if the
                                          current leader is not the preferred
                                          leader for the topic partition. If
                                          longest_log_proactive/longest_log_balanced/designation 
                                          election is selected, the
                                          election is only performed if there
                                          are no leader for the topic
                                          partition. REQUIRED.
// Updated field ends.
--help                                  Print usage information.
--partition <Integer: partition id>     Partition id for which to perform an
                                          election. REQUIRED if --topic is
                                          specified.
                                          
// Updated field starts.                                        
--path-to-json-file <String: Path to    The JSON file with the list  of
  JSON file>                              partition for which leader elections
                                          should be performed. This is an
                                          example format. The desiredLeader field
                                          is only required in DESIGNATION election.
                                          The minimalReplicas field is only required in
                                          LONGEST_LOG election
                                        {"partitions":
                                        	[{"topic": "foo", "partition": 1, "desiredLeader": 0, "minimalReplicas": 2},
                                        	 {"topic": "foobar", "partition": 2, "desiredLeader": 1, "minimalReplicas": 2}]
                                        }
                                        Not allowed if --all-topic-partitions
                                          or --topic flags are specified.
// Updated field ends.

--topic <String: topic name>            Name of topic for which to perform an
                                          election. Not allowed if --path-to-
                                          json-file or --all-topic-partitions
                                          is specified.
--version                               Display Kafka version.

Metrics

The following gauge metrics will be added

...

Comparing the current ISR model with the proposed design


Current

Proposed

Produce

  • Ack=0. No ack is required.

  • Ack=1. Ack after writing to the local log

The same. 

Consumer

Clients can consume incoming messages if the ISR size is below min ISR.

Clients can't consume new messages if the ISR size is below min ISR.

Replication

HWM will move forward even if the ISR size is below min ISR.

HWM can not move forward if the ISR size is below min ISR.

Recover when

all replicas have

been fenced

  1. An unclean leader election is required if the last member in ISR can't come back.

  2. Only the last ISR member can be elected as the leader.

  3. The system will take all the data loss that the last ISR member has.

  1. An unclean recovery is required if no suitable replica can be elected.

  2. The member of ELR can be elected as the leader. No guarantee on the ack=0/1 messages but it will be the possible minimal.

Compatibility, Deprecation, and Migration Plan

...