Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under DiscussionAccepted"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here  

Vote thread: here 

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-14617

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • The ISR expansion is triggered by the Fetch requests. Using Fetch requests makes the expansion synchronized. Broker is online → Fetch and catch up → candidate for ISR → expand with AlterPartition. The flow is straightforward and easy to maintain.

  • Using broker metadata requires more careful thinking about potential issues. For example, if the last fetch request comes super late, the leader already knows the new broker epoch and sends the AlterPartition with the new one, then the rebooted broker can still be added to ISR.

The new behaviors break down:

  • Also, as a precautions check, the leader will only attempt to include the broker in ISR if the broker epoch from the metadata cache matches the epoch from the Fetch request.


The new behaviors break down:

  1. Follower

    • The

    Follower

    • The follower will send its epoch id (received when registering) in the Fetch request.

  2. Leader

    • The leader will keep track of the follower epochs received through the Fetch request. The epochs can be maintained in the ReplicaState.

    • The leader will include the proposed ISR follower epochs in the AlterPartition request. Also, the leader will verify these epochs against the metadata cache. Before the broker epochs from the Fetch request and the metadata cache are consistent, the leader will not propose to include the broker in the ISR.

    • The leader will discard the proposed ISR change and revert to the old committed ISR if the The leader will discard the proposed ISR change and revert to the old committed ISR if the controller returns INELIGIBLE_REPLICA for the partition.

  3. Controller

    • The controller will make sure all the broker epochs in the ISR match with the latest broker epochs before persisting this change. If not, it will reject the request with the error code INELIGIBLE_REPLICA.

...

Request

Code Block
{
  "apiKey": 56 56,
  "type":  "request",
  "listeners": ["zkBroker",  "controller"],
  "name":  "AlterPartitionRequest",
  "validVersions":  "0-23",
  "flexibleVersions":  "0+",
  "fields": [
    {  "name":  "BrokerId",  "type":  "int32",  "versions":  "0+",  "entityType":  "brokerId",
      "about":  "The ID of the requesting broker"  },
    {  "name":  "BrokerEpoch",  "type":  "int64",  "versions":  "0+",  "default":  "-1",
      "about":  "The epoch of the requesting broker"  },
    {  "name":  "Topics",  "type":  "[]TopicData",  "versions":  "0+",  "fields": [
      {  "name":  "TopicName",  "type":  "string",  "versions":  "0-1",  "ignorable": true true,  "entityType":  "topicName",
        "about":  "The name of the topic to alter ISRs for"  },
      {  "name":  "TopicId",  "type":  "uuid",  "versions":  "2+",  "ignorable": true true,
        "about":  "The ID of the topic to alter ISRs for"  },
      {  "name":  "Partitions",  "type":  "[]PartitionData",  "versions":  "0+",  "fields": [
        {  "name":  "PartitionIndex",  "type":  "int32",  "versions":  "0+",
          "about":  "The partition index"  },
        {  "name":  "LeaderEpoch",  "type":  "int32",  "versions":  "0+",
          "about":  "The leader epoch of this partition"  },
// Old Field deprecated.
        {  "name":  "NewIsr",  "type":  "[]int32",  "versions":  "0-2",  "entityType":  "brokerId",
          "about":  "The ISR for this partition"  },
// Old Field deprecated.
// New Field begin
        {  "name":  "NewIsrWithEpochs",  "type":  "[]BrokerState",  "versions":  "3+",  "fields": [
            {  "name":  "BrokerId",  "type":  "int32",  "versions":  "3+",  "entityType":  "brokerId",
              "about":  "The ID of the broker."  },
        	{  "name":  "BrokerEpoch",  "type":  "int64",  "versions":  "3+",
               "aboutdefault": "The epoch of  "-1",
               "about": "The epoch of the broker. It will be -1 if the epoch check is not supported."  },
         ]},
// New Field End
        {  "name":  "LeaderRecoveryState",  "type":  "int8",  "versions":  "1+",  "default":  "0",
          "about":  "1 if the partition is recovering from an unclean leader election; 0 otherwise."  },
        {  "name":  "PartitionEpoch",  "type":  "int32",  "versions":  "0+",
          "about":  "The expected epoch of the partition which is being updated. For legacy cluster this is the ZkVersion in the LeaderAndIsr request."  }
      ]}
    ]}
  ]
}

Fetch request RPC

Code Block
linenumberstrue
{
  "apiKey": 1 1,
  "type":  "request",
  "listeners": ["zkBroker",  "broker",  "controller"],
  "name":  "FetchRequest",
  "validVersions":  "0-1315",
  "flexibleVersions":  "12+",
  "fields": [
    {  "name":  "ClusterId",  "type":  "string",  "versions":  "12+",  "nullableVersions":  "12+",  "default":  "null",
      "taggedVersions":  "12+",  "tag": 0 0,  "ignorable": true true,
      "about":  "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
    { "name": "ReplicaId",  },

// Deprecate this field. It is now in the ReplicaState. Also change the default value to -1.
	{ "name": "ReplicaId", "type":  "int32",  "versions":  "0+-14",  "entityTypedefault":  "brokerId-1",
      	  "aboutentityType": "The "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer."  },  
// Update ends

// New Field begin
        	{  "name":  "BrokerEpochReplicaState",  "type":  "int64ReplicaState",  "versionstaggedVersions": "1415+",  "ignorabletag": true 1,
           "aboutfields": "The epoch of this follower." },
// New Field End
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true [
	{ "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", "entityType": "brokerId",
	  "about": "The replica ID of the follower, of -1 if this request is from a consumer." },
	{ "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": "-1",
	  "about": "The epoch of this follower." }
	]},
// New Field End

    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about":  "The maximum time bytesin milliseconds to fetch.  See KIP-74 wait for cases where this limit may not be honored." the response." },
    {  "name":  "IsolationLevelMinBytes",  "type":  "int8int32",  "versions":  "4+", "default": "0", "ignorable": true,0+",
      "about":  "ThisThe settingminimum controlsbytes the to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true,
      "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records"  },
    {  "name":  "SessionId",  "type":  "int32",  "versions":  "7+",  "default":  "0",  "ignorable": true true,
      "about":  "The fetch session ID."  },
    {  "name":  "SessionEpoch",  "type":  "int32",  "versions":  "7+",  "default":  "-1",  "ignorable": true true,
      "about":  "The fetch session epoch, which is used for ordering requests in a session."  },
    {  "name":  "Topics",  "type":  "[]FetchTopic",  "versions":  "0+",
      "about":  "The topics to fetch.",  "fields": [
      {  "name":  "Topic",  "type":  "string",  "versions":  "0-12",  "entityType":  "topicName",  "ignorable":  "true",
        "about":  "The name of the topic to fetch."  },
      {  "name":  "TopicId",  "type":  "uuid",  "versions":  "13+",  "ignorable": true true,  "about":  "The unique topic ID"},
      {  "name":  "Partitions",  "type":  "[]FetchPartition",  "versions":  "0+",
        "about":  "The partitions to fetch.",  "fields": [
        {  "name":  "Partition",  "type":  "int32",  "versions":  "0+",
          "about":  "The partition index."  },
        {  "name":  "CurrentLeaderEpoch",  "type":  "int32",  "versions":  "9+",  "default":  "-1",  "ignorable": true true,
          "about":  "The current leader epoch of the partition."  },
        {  "name":  "FetchOffset",  "type":  "int64",  "versions":  "0+",
          "about":  "The message offset."  },
        {  "name":  "LastFetchedEpoch",  "type":  "int32",  "versions":  "12+",  "default":  "-1",  "ignorable": false false,
          "about":  "The epoch of the last fetched record or -1 if there is none"},
        {  "name":  "LogStartOffset",  "type":  "int64",  "versions":  "5+",  "default":  "-1",  "ignorable": true true,
          "about":  "The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower."},
        {  "name":  "PartitionMaxBytes",  "type":  "int32",  "versions":  "0+",
          "about":  "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored."  }
      ]}
    ]},
    {  "name":  "ForgottenTopicsData",  "type":  "[]ForgottenTopic",  "versions":  "7+",  "ignorable": false false,
      "about":  "In an incremental fetch request, the partitions to remove.",  "fields": [
      {  "name":  "Topic",  "type":  "string",  "versions":  "7-12",  "entityType":  "topicName",  "ignorable": true true,
        "about":  "The topic name."  },
      {  "name":  "TopicId",  "type":  "uuid",  "versions":  "13+",  "ignorable": true true,  "about":  "The unique topic ID"},
      {  "name":  "Partitions",  "type":  "[]int32",  "versions":  "7+",
        "about":  "The partitions indexes to forget."  }
    ]},
    {  "name":  "RackId",  "type":  "string",  "versions":  "11+",  "default":  "",  "ignorable": true true,
      "about":  "Rack ID of the consumer making this request"}
  ]
} of the consumer making this request"}
  ]
}

Note: The replica id is moved to ReplicaState because we can benefit from reducing the consumer fetch request message size. The ReplicaState is a tagged field and will be the default value if it is from the consumers.

Compatibility, Deprecation, and Migration Plan

  1. As the AlterPartition and Fetch requests are shared between ZK and Kraft mode, the related field will keep empty in ZK mode and will not be used.we can just let the ZK controller ignore the broker epoch field.
  2. The AlterPartition request will be protected by the ApiVersion bump. The brokers can use the controller-supported version accordingly.
  3. The metadata version/IBP version will be bumped to gate the new version of the Fetch request. After the IBP is upgraded, the new Fetch quest(version 15) will be usedThe API version will be bumped to avoid backward compatibility issues.

Test Plan

Will include system tests to simulate test by simulating the race scenario between of the broker registration delayed AlterPartitionRequest and the delayed AlterPartition requestbroker reboot.

Rejected Alternatives

Bump all the partition leader epochs when a broker registers.

...