You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Draft [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Unlike the LeaderAndIsrRequest, the StopReplicaRequest does not include the leader epoch which makes it vulnerable to reordering. This KIP proposes to add the leader epoch for each partition in the StopReplicaRequest and the broker will verify the epoch before proceeding with the StopReplicaRequest.

Public Interfaces

We will bump the version of the StopReplicaRequest/StopReplicaResponse and add the leader epoch for each partition in the request.

{
  "apiKey": 5,
  "type": "request",
  "name": "StopReplicaRequest",
  // Version 1 adds the broker epoch and reorganizes the partitions to be stored
  // per topic.
  //
  // Version 2 is the first flexible version.
  //
  // Version 3 adds the leader epoch per partition.
  "validVersions": "0-3",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
      "about": "The controller id." },
    { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
      "about": "The controller epoch." },
    { "name": "BrokerEpoch", "type": "int64", "versions": "1+", "default": "-1", "ignorable": true,
      "about": "The broker epoch." },
    { "name": "DeletePartitions", "type": "bool", "versions": "0+",
      "about": "Whether these partitions should be deleted." },
    { "name": "UngroupedPartitions", "type": "[]StopReplicaPartitionV0", "versions": "0",
      "about": "The partitions to stop.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "PartitionIndex", "type": "int32", "versions": "0",
        "about": "The partition index." }
    ]},
    { "name": "Topics", "type": "[]StopReplicaTopic", "versions": "1+",
      "about": "The topics to stop.", "fields": [
      { "name": "Name", "type": "string", "versions": "1+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "PartitionIndexes", "type": "[]int32", "versions": "1+",
        "about": "The partition indexes." },
	  // New Field
	  { "name": "LeaderEpoch", "type": "int32", "versions": "3+",
        "about": "The leader epoch." }
    ]}
  ]
}

Proposed Changes

The controller will include the leader epoch of each partition when sending out an StopReplicaRequest. The broker will verify the epoch of each partitions and send an `STALE_CONTROLLER_EPOCH` error when the leader epoch received is older than the known one. `STALE_CONTROLLER_EPOCH` is used to remain consistent with the LeaderAndIsr API.

Compatibility, Deprecation, and Migration Plan

The change is backward compatible with older broker.

Rejected Alternatives

N/A

  • No labels