Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Address comments from Jun Rao

...

  • It applies to the ack=1 message replication as well. Note that the leader still acknowledges the client requests when the ack=1 messages have persisted in the leader log.

  • The ISR membership refers to the latest ISR membership persisted by the controller, not the "maximal ISR" which is defined by the leader that includes the current ISR members and pending-to-add replicas that have not yet been committed to the controller.

  • Note that, if If maximal ISR > ISR, the message should be replicated to the maximal ISR before covering the message under HWM. The proposal does not change this behaviorISR refers to the ISR committed by the controller.

As a side effect of the new requirement:

...

  • The ISR will still continue to serve its replication function. The High Watermark forwarding still requires a quorum within the ISRthe replication of the full ISR. This ensures that replication between brokers remains unchanged. 

  • To handle leader elections, we will introduce a concept called Eligible Leader Replicas (ELR). In addition to the ISR members, replicas in ELR are also eligible for leader election during a clean election process.

...

Here is an example that demonstrates most of the above ELR behaviors. The setup is 4 brokers with min ISR 3.

Image RemovedImage Added

A common question is whether we could advance HWM when we have an ELR member (not replicating from leader), thus violating the invariant that every ELR member has data at least up to HWM. Consider the following example of a 3 replicas partition with min ISR=2:

...

  1. If there are other ISR members, choose an ISR member.

  2. If there are unfenced ELR members, choose an ELR member.

  3. If there are fenced ELR members

    1. If the unclean.recovery.strategy=Proactive, then an unclean recovery will happen.

    2. Otherwise, we will wait for the fenced ELR members to be unfenced.

  4. If there are no ELR members.

    1. If the unclean.recovery.strategy=balanced or Proactive, the controller will do the unclean recovery.

    2. Otherwise, unclean.recovery.strategy=Manual, the controller will not attempt to elect a leader. Waiting for the user operations.

...

  1. In Balance mode, all the LastKnownELR members have replied, plus the replicas replied within the timeout.

  2. In Proactive mode, any replicas replied within a fixed amount of time OR the first response received after the timeout. We don’t want to make a separate config for it so just make the fixed time of 5 seconds.

...

{
  "apiKey":62,
  "type": "request",
  "listeners": ["controller"],
  "name": "BrokerRegistrationRequest",
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID." },
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "about": "The cluster id of the broker process." },
    { "name": "IncarnationId", "type": "uuid", "versions": "0+",
      "about": "The incarnation id of the broker process." },
    { "name": "Listeners", "type": "[]Listener",
      "about": "The listeners of this broker", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The name of the endpoint." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The hostname." },
      { "name": "Port", "type": "uint16", "versions": "0+",
        "about": "The port." },
      { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
        "about": "The security protocol." }
    ]
    },
    { "name": "Features", "type": "[]Feature",
      "about": "The features on this broker", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported feature level." }
    ]
    },
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The rack which this broker is in." },
    { "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
      "about": "If the required configurations for ZK migration are present, this value is set to true" },
// New fields begin.
    { "name": "BrokerEpochPreviousBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
      "about": "The broker's assigned epoch or the epoch before a clean shutdown." }
// New fields end.
  ]
}

DescribeTopicRequest

Should be issued by admin clients or brokers. The controller will serve this request.

ACL: Describe Topic

{
  "apiKey":69,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "DescribeTopicRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]DescribeTopicResponseTopic", "versions": "0+",
      "about": "The topics to fetch details for.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." },
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "nullableVersions": "0+",
        "about": "The topic name." }
    ]}
]
}

...

GetReplicaLogInfo Request

ACL: Read Topic

{
  "apiKey":70,
  "type": "request",
  "listeners": ["broker"],
  "name": "GetReplicaLogInfoRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", 
        "about": "The ID of the broker." },
    { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
    "about": "The topic partitions to elect leaders.",
    "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions of this topic whose leader should be elected." },
    ]}
] }

...

--admin.config <String: config file>    Configuration properties files to pass
                                          to the admin client
--all-topic-partitions                  Perform election on all of the
                                          eligible topic partitions based on
                                          the type of election (see the --
                                          election-type flag). Not allowed if
                                          --topic or --path-to-json-file is
                                          specified.
--bootstrap-server <String: host:port>  A hostname and port for the broker to
                                          connect to, in the form host:port.
                                          Multiple comma separated URLs can be
                                          given. REQUIRED.
// Updated field starts.
--election-type <[PREFERRED, UNCLEAN, LONGEST_LOG_PROACTIVE, LONGEST_LOG_BALANCED, DESIGNATION]:                
                                          Type of election to attempt. Possible
  election type>                          values are "preferred" for preferred
                                          leader election, or "unclean" for
                                          a random unclean leader election,
                                          or "longest_log_proactive"/"longest_log_balanced"
                                          to choose the replica 
                                          with the longest log or "designation" for
                                          electing the given replica to be the leader. If
                                          preferred election is selection, the
                                          election is only performed if the
                                          current leader is not the preferred
                                          leader for the topic partition. If
                                          longest_log_proactive/longest_log_balanced/designation 
                                          election is selected, the
                                          election is only performed if there
                                          are no leader for the topic
                                          partition. REQUIRED.
// Updated field ends.
--help                                  Print usage information.
--partition <Integer: partition id>     Partition id for which to perform an
                                          election. REQUIRED if --topic is
                                          specified.
                                          
// Updated field starts.                                        
--path-to-json-file <String: Path to    The JSON file with the list  of
  JSON file>                              partition for which leader elections
                                          should be performed. This is an
                                          example format. The desiredLeader field
                                          is only required in DESIGNATION election.
                                          The minimalReplicas field is only required in
                                          LONGEST_LOG election
                                        {"partitions":
                                        	[{"topic": "foo", "partition": 1, "desiredLeader": 0, "minimalReplicas": 2},
                                        	 {"topic": "foobar", "partition": 2, "desiredLeader": 1, "minimalReplicas": 2}]
                                        }
                                        Not allowed if --all-topic-partitions
                                          or --topic flags are specified.
// Updated field ends.

--topic <String: topic name>            Name of topic for which to perform an
                                          election. Not allowed if --path-to-
                                          json-file or --all-topic-partitions
                                          is specified.
--version                               Display Kafka version.

...

  • kafka.replication.electable_replicas_count. It will be the sum of (size of ISR + size of ELR).

The following count metrics will be added.

  • kafka.replication.unclean_recovery_partitions_count. It counts the partitions that are under unclean recovery.
  • kafka.replication.

...

  • manual_operation_required_partition_count. It counts the partition that is leaderless and waits for user operations to find the next leader.


Public-Facing Changes

  1. min.insync.replicas now applies to the replication of all kinds of messages. The High Watermark will only advance if all the messages below it have been replicated to at least least min.insync.replicas replicasISR members.

  2. The consumer is affected when consuming the ack=1 and ack=0 messages. When there is only 1 replica(min ISR=2), the HWM advance is blocked, so the incoming ack=0/1 messages are not visible to the consumers. Users can avoid the side effect by updating the min.insync.replicas to 1 for their ack=0/1 topics.

  3. Compared to the current model, the proposed design has availability trade-offs:

    1. If the network partitioning only affects the heartbeats between a follower and the controller, the controller will kick it out of ISR.  If If losing this replica makes the ISR under min ISR, the HWM advancement will be blocked unnecessarily because we require the ISR to have at least min ISR members. However, it is not a regression compared to the current system at this point. But later when the network partitioning finishes, the current leader will put the follower into the pending ISR(aka "maximum ISR") and continue moving forward while in the proposed world, the leader needs to wait for the controller to ack the ISR change.

    2. Electing a leader from ELR may mean choosing a degraded broker. Degraded means the broker can have a poor performance in replication due to common reasons like networking or disk IO, but it is alive. It can also be the reason why it fails out of ISR in the first place. This is a trade-off between availability and durability.

  4. The unclean leader election will be replaced by the unclean recovery.

  5. For fsync users, the ELR can be beneficial to have more choices when the last known leader is fenced. It is worth mentioning what to expect when ISR and ELR are both empty. We assume fsync users adopt the unclean.leader.election.enable as false.
    1. If the KIP has been fully implemented. The unclean.recovery.strategy will be balanced. During the unclean recovery, the URM will elect a leader when all the LastKnownElr members have replied.
    2. If only the ELR or Unclean recovery is implemented, the LastKnownLeader is preferred.

...

  1. For the existing unclean.leader.election.enable

    1. If true, unclean.recovery.strategy will be set to Proactive.

    2. If false, unclean.recovery.strategy will be set to Balanced.

  2. unclean.recovery.strategy is guarded by the metadata version. Ideally, it should be enabled with the same MV with the ELR change.

  3. The unclean leader election behavior is kept before the MV upgrade.

  4. Once the unclean recovery is enabled, the MV is not downloadable. 

Delivery plan

The KIP is a large plan, it can be across multiple quarters. So we have to consider how to deliver the project in phases.

...