Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the context of Raft, it is a very common for leaders to hit network partition. We implemented the progress timeout in KIP-595, so that when a majority of voters are not fetching from the leader, the current leader will start election to reach out to other followers to re-establish the leadership or learn about the new leader. However, if the zombie leader could not connect to the quorum for long enough, it will timeout and start election, while bumping its epoch. Until it eventually reconnects to the quorum, the zombie leader will repeatedly bump its epoch due to election time outtimeout. The consequence is that the zombie leader will gain a much higher epoch than other voters inside the quorum and win disrupt the electionquorum, introducing extra unavailability. During quorum reassignment, the removed voters could also try to start election to ask active leader to step down which affects the ongoing transition. This is called "disruptive server" the Raft literature. Furthermore, the rapid bumping of the epoch increases the chance of epoch overflow, which is very dangerous in the Kafka Raft as we have been using uint32 for epoch due to historical message formats, compared with using uint64.

...

Code Block
{
  "apiKey": 50,
  "type": "request",
  "name": "VoteRequest",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
          "about": "The bumped epoch of the candidate sending the request"},
        { "name": "CandidateId", "type": "int32", "versions": "0+",
          "about": "The ID of the voter sending the request"},
        { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
          "about": "The epoch of the last record written to the metadata log"},
        { "name": "LastOffset", "type": "int64", "versions": "0+",
          "about": "The offset of the last record written to the metadata log"},
		{ "name": "PreVote", "type": "boolean", "versions": "01+", // NEW
          "about": "Suggest whether given request is a pre-vote"}, 
      ]
      }
    ]
    }
  ]
}

And the Raft node will respond with either an approval or reject to the pre-vote request in the same way as responding to the normal vote request. If pre-vote fails, the active candidate will not continue to attempt to do the actual vote.


For linearizable read, a A separate RPC called ReadIndex  ReadOffset shall be implemented for the non-leader node to query the current safe index applied offset on the active leader's state machine for a linearizable query. 

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "ReadIndexRequestReadOffsetRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "ReadTimestamp", "type": "int64", "versions": "1", "default": "-1",
      "about": "The timestamp of the read query."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
      ]
      }
    ]
    }
  ]
}


Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "ReadOffsetResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
      "nullableVersions": "0+", "default": "null"},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
		{ "name": "AppliedOffset", "type": "int64", "versions": "0+",
          "about": "The last applied offset on the leader." }, 
      ]
      }
    ]
    }
  ]
}

Compatibility, Deprecation, and Migration Plan

...