Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here

JIRA: here

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9539

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": 5,
  "type": "request",
  "name": "StopReplicaRequest",
  // Version 1 adds the broker epoch and reorganizes the partitions to be stored
  // per topic.
  //
  // Version 2 is the first flexible version.
  //
  // Version 3 reorganizes the partitions to be stored, adds the leader epoch
  // and the delete partition fields per partition (KIP-570).
  "validVersions": "0-3",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The controller id." },
    { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The controller epoch." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "1+", "default": "-1", "ignorable": true,
      "about": "The broker epoch." },
    { "name": "DeletePartitions", "type": "bool", "versions": "0+-2",
      "about": "Whether these partitions should be deleted." },
    { "name": "UngroupedPartitions", "type": "[]StopReplicaPartitionV0", "versions": "0",
      "about": "The partitions to stop.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "PartitionIndex", "type": "int32", "versions": "0",
        "about": "The partition index." }
    ]},
    { "name": "Topics", "type": "[]StopReplicaTopicStopReplicaTopicV1", "versions": "1+-2",
      "about": "The topics to stop.", "fields": [
      { "name": "Name", "type": "string", "versions": "1+-2", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "PartitionIndexes", "type": "[]int32", "versions": "1+-2",
        "about": "The partition indexes." }
    ]},
	  // New Field
	 Structure from V3 on
    { "name": "TopicStates", "type": "[]StopReplicaTopicState", "versions": "3+",
      "about": "Each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "3+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "PartitionStates", "type": "[]StopReplicaPartitionState", "versions": "3+",
        "about": "The state of each partition", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "3+",
          "about": "The partition index." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "3+", "default": "-1",
          "about": "The leader epoch." },
        { "name": "DeletePartition", "type": "bool", "versions": "3+",
          "about": "Whether this partition should be deleted." }
      ]}
    ]}
  ]
}

Proposed Changes

The controller will include the leader epoch of each partition when sending out an StopReplicaRequest. The broker will verify the epoch of each partitions and send an `STALE`FENCED_CONTROLLERLEADER_EPOCH` error when the leader epoch received is older than the known one. `STALE_CONTROLLER_EPOCH` is used to remain consistent with the LeaderAndIsr API. When a topic is deleted, the leader epoch is not bumped. In this case, we will send a sentinel (-12) which overrides any existing epoch. Older version of the request will use a sentinel (-1) to indicate the the leader epoch is not present when the controller is still on the old version during the upgrade.

Starting from V3, only one StopReplica request will be sent by the controller, combining the partitions to be deleted and the partitions to stopped only.

Compatibility, Deprecation, and Migration Plan

...