Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


This KIP proposes changing the ISR expansion logic on the leader and and the ISR validation logic on the controller to avoid bringing back fenced or shutting down replicas in the ISR. The leader will consider only unfenced active (not fenced nor in controlled shutdown) replicas to be eligible to join the ISR. It will rely on the metadata cache to get this information via the metadata log. The controlled shutdown state is not persisted to the metadata log at the moment. This KIP proposes to persist it for the leader to leverage it.

As the metadata cache is eventually consistent, the leader might try to add a replica - which was just removed by the controller - back to the ISR because it does not know that the replica was fenced by the controller 's latest state yet. In order to avoid this, the controller will validate the new ISR and reject any AlterPartition request containing an ineligible replica with the newly introduced INELIGIBLE_REPLICA error code. For backward compatibility, OPERATION_NOT_ATTEMPTED will be used for older versionsThe leader will consider only active (not fenced nor in controlled shutdown) replicas to be eligible. When the leader receives an INELIGIBLE_REPLICA error code, it is expected to revert back its state to the last committed state - assuming that the state did not change in the mean time. When a broker is unfenced by the controller, the leader does nothing because subsequent fetch requests from the followers will try to get them back into the ISR if they are caught-up.


The version of the AlterPartition API is bumped to version 2.


As we need to bump the version anyway, we propose to include the TopicId  field in the request.

Code Block
  "apiKey": 56,
  "type": "request",
  "listeners": ["zkBroker", "controller"],
  "name": "AlterPartitionRequest",
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The ID of the requesting broker" },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
      "about": "The epoch of the requesting broker" },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0-1", "entityType": "topicName",
        "about": "The name of the topic to alter ISRs for" },
      // New Field Begin
      { "name": "TopicId", "type": "uuid", "versions": "2+", "ignorable": true,
        "about": "The ID of the topic to alter ISRs for" }, 
      // New Field End 
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index" },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of this partition" },
        { "name": "NewIsr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ISR for this partition" },
        { "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", "default": "0",
          "about": "1 if the partition is recovering from an unclean leader election; 0 otherwise." },
        { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
          "about": "The expected epoch of the partition which is being updated. For legacy cluster this is the ZkVersion in the LeaderAndIsr request." }


The INELIGIBLE_REPLICA is returned in the response if any of the replicas in the new ISR contains a fenced or shutting down replica.

Code Block
  "apiKey": 56,
  "type": "response",
  "name": "AlterPartitionResponse",
  "validVersions": "0-2",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level response error code" },
    { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
      { "name":  "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic" },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index" },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The partition level error code" },
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The broker ID of the leader." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch." },
        { "name": "Isr", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
          "about": "The in-sync replica IDs." },
        { "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", "default": "0", "ignorable": true,
          "about": "1 if the partition is recovering from an unclean leader election; 0 otherwise." },
        { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
          "about": "The current epoch for the partition for KRaft controllers. The current ZK version for the legacy controllers." }

Cluster Metadata Records


The InControlledShutdown  field is added to the RegisterBrokerRecord to persist when a broker starts the controlled shutdown procedure.

Code Block
  "apiKey": 0,
  "type": "metadata",
  "name": "RegisterBrokerRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker id." },
    { "name": "IncarnationId", "type": "uuid", "versions": "0+",
      "about": "The incarnation ID of the broker process" },
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
      "about": "The broker epoch assigned by the controller." },
    { "name": "EndPoints", "type": "[]BrokerEndpoint", "versions": "0+",
      "about": "The endpoints that can be used to communicate with this broker.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
          "about": "The name of the endpoint." },
        { "name": "Host", "type": "string", "versions": "0+",
          "about": "The hostname." },
        { "name": "Port", "type": "uint16", "versions": "0+",
          "about": "The port." },
        { "name": "SecurityProtocol", "type": "int16", "versions": "0+",
          "about": "The security protocol." }
    { "name": "Features", "type": "[]BrokerFeature",
      "about": "The features on this broker", "versions": "0+", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The feature name." },
      { "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported feature level." },
      { "name": "MaxSupportedVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported feature level." }
    { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The broker rack." },
    { "name": "Fenced", "type": "bool", "versions": "0+", "default": "true",
      "about": "True if the broker is fenced." },
    // New Field Begin
    { "name": "InControlledShutdown", "type": "bool", "versions": "0+", "default": "false",
      "about": "True if the broker is in controlled shutdown." } 
    // New Field End


The InControlledShutdown field is added to the BrokerRegistrationChangeRecord as well.

Code Block
  "apiKey": 17,
  "type": "metadata",
  "name": "BrokerRegistrationChangeRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
   { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
     "about": "The broker id." },
   { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
     "about": "The broker epoch assigned by the controller." },
   { "name": "Fenced", "type": "int8", "versions": "0+", "taggedVersions": "0+", "tag": 0,
     "about": "-1 if the broker has been unfenced, 0 if no change, 1 if the broker has been fenced." },
   // New Field Begin
   { "name": "InControlledShutdown", "type": "int8", "versions": "0+", "taggedVersions": "0+", "tag": 1,
     "about": "-1 if the broker is not in controlled shutdown anymore, 0 if not change, 1 if the broker is in controlled shutdown." } 
   // New Field End 

Compatibility, Deprecation, and Migration Plan

The change is backward compatibleFor backward compatibility, OPERATION_NOT_ATTEMPTED will be used for older versions of the AlterPartition API.

Rejected Alternatives
