Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update RPCs to use simple names for submodule

...

Leader Progress Timeout

In the traditional push-based model, when a leader is disconnected from the quorum due to network partition, it will start a new election to learn the active quorum or form a new one immediately. In the pull-based model, however, say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader", as there is no regular heartbeats being pushed from leader to the follower. This could lead to stale information being served to the observers and clients inside the cluster.

To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms" config, such that if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election and start sending VoteRequest to voter nodes in the cluster to understand the latest quorum. If it couldn't connect to any known voter, the old leader shall keep starting new elections and bump the epoch. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes a follower. Note that the node will remain a candidate until it finds that it has been supplanted by another voter, or win the election eventually.

As we know from the Raft literature, this approach could generate disruptive voters when network partitions happen on the leader. The partitioned leader will keep increasing its epoch, and when it eventually reconnects to the quorum, it could win the election with a very large epoch number, thus reducing the quorum availability due to extra restoration time. Considering this scenario is rare, we would like to address it in a follow-up KIP.

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "VoteRequest",
  "validVersions": "0",
  "flexibleVersionflexibleVersions": "0+",
  "fields": [
    {  {"name": "ClusterId", "type": "string", "versions": "0+",
       "about" "Generated UUID for this cluster"},
      { "name": "Topics", "type": "[]VoteTopicRequestTopicData", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]VotePartitionRequestPartitionData", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
        {      {"name": "CandidateEpoch", "type": "int32", "versions": "0+",
              "about": "The bumped epoch of the candidate sending the request"},
              {{ "name": "CandidateId", "type": "int32", "versions": "0+",
              "about": "The ID of the voter sending the request"},
        {      {"name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
              "about": "The epoch of the last record written to the metadata log"},
              {{ "name": "LastOffset", "type": "int64", "versions": "0+",
              "about": "The offset of the last record written to the metadata log"}
            ]
      }
    }]
      }
  ]
}

As noted above, the request follows the convention of other batched partition APIs, such as Fetch and Produce. Initially we expect that only a single topic partition will be included, but this leaves the door open for "multi-raft" support.

...

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "VoteResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The top level error code."},
     { "name": "Topics", "type": "[]VoteTopicResponseTopicData", 
       "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
            "about"about": "The topic name." },
          { "name": "Partitions", "type": "[]VotePartitionResponsePartitionData", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {{ "name": "ErrorCode", "type": "int16", "versions": "0+"},
              {{ "name": "LeaderId", "type": "int32", "versions": "0+",
              "about": "The ID of the current leader or -1 if the leader is unknown."},
        {      {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
              "about": "The latest known leader epoch"},
        {      {"name": "VoteGranted", "type": "bool", "versions": "0+",
              "about": "True if the vote was granted and false otherwise"}
            ]
      }
    }]
      }        
  ]
}

Vote Request Handling

Note we need to respect the extra condition on leadership: the candidate’s log is at least as up-to-date as any other log in the majority who vote for it. Raft determines which of two logs is more "up-to-date" by comparing the offset and epoch of the last entries in the logs. If the logs' last entry have different epochs, then the log with the later epoch is more up-to-date. If the logs end with the same epoch, then whichever log is longer is more up-to-date. The Vote request includes information about the candidate’s log, and the voter denies its vote if its own log is more up-to-date than that of the candidate.

...

Request Schema

Code Block
{
  "apiKey": N51,
  "type": "request",
  "name": "BeginQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
     { {"name": "ClusterId", "type": "string", "versions": "0+",
       "about" "Generated UUID for this cluster"},
      { "name": "Topics", "type": "[]BeginQuorumTopicRequestTopicData", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]BeginQuorumPartitionRequestPartitionData", 
            "versions": "0+", "fields": [
              { "name": "PartitionIndex", "type": "int32", "versions": "0+",
              "about": "The partition index." },
              {{ "name": "LeaderId", "type": "int32", "versions": "0+",
                "about": "The ID of the newly elected leader"},
              {{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
                "about": "The epoch of the newly elected leader"}
            ]
          }
      ]}
  ]
}

Response Schema

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "BeginQuorumEpochResponse",
  "validVersions": "0",
  "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
		      "about": "The top level error code."},
      { "name": "Topics", "type": "[]BeginQuorumTopicResponseTopicData", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]BeginQuorumPartitionResponsePartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "fieldsabout": [
"The partition index." },
        { {"name": "ErrorCode", "type": "int16", "versions": "0+"},
           { {"name": "LeaderId", "type": "int32", "versions": "0+",
             "about": "The ID of the current leader or -1 if the leader is unknown."},
          {  {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
             "about": "The latest known leader epoch"}
            ]
          }
      ]}
  ]
}

BeginQuorumEpoch Request Handling

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "EndQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
    { "name": "ClusterId", "type": "string", "versions": "0+",
     "about" "Generated UUID for this cluster"},
    { "name": "Topics", "type": "[]EndQuorumTopicRequestTopicData", 
      "versions": "0+", "fields": [
        { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]EndQuorumPartitionRequestPartitionData", 
          "versions": "0+", "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },    
   },
         { {"name": "ReplicaId", "type": "int32", "versions": "0+",
            "about": "The ID of the replica sending this request"},
        {    {"name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The current leader ID or -1 if there is a vote in progress"},
          {  {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The current epoch"},
			        { "name": "PreferredSuccessors", "type": "[]int32", "versions": "0+",
		          "about": "A sorted list of preferred successors to start the election"}
          ]
       }
    ]}      
  ]
}

Note that LeaderId and ReplicaId will be the same if the leader has been voted. If the replica is a candidate in a current election, then LeaderId will be -1.

...

Code Block
{
  "apiKey": N,
  "type": "response",
  "name": "EndQuorumEpochResponse",
  "validVersions": "0",
  "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
		      "about": "The top level error code."}, 
      { "name": "Topics", "type": "[]EndQuorumTopicResponseTopicData", 
        "versions": "0+", "fields": [
          { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
            "about": "The topic name." },
          { "name": "Partitions", "type": "[]EndQuorumPartitionResponse", PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+", "fields": [
    "0+",
          "about": "The partition index." },
        { {"name": "ErrorCode", "type": "int16", "versions": "0+"},
        {    {"name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
         {   {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
            ]
          }
      }          ]}
  ]
}

EndQuorumEpoch Request Handling

...

Current Leader Discovery: This proposal also extends the Fetch response to include a separate field for the receiver of the request to indicate the current leader. This is intended to improve the latency to find the current leader since otherwise a replica would need additional round trips. When a broker is restarted, it will rely on the Fetch protocol to find the current leader. 

We believe that using the same mechanism would be helpful for normal partitions as well since consumers have to use the Metadata API to find the current leader after received a NOT_LEADER_FOR_PARTITION error. However, this is outside the scope of this proposal.

...

Request Schema

Code Block
{
  "apiKey": 56N,
  "type": "request",
  "name": "DescribeQuorumRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]DescribeQuorumTopicRequestTopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]DescribeQuorumPartitionRequestPartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." }
      ]
      }]
    }
  ]
}

Response Schema

Code Block
{
  "apiKey": 56N,
  "type": "response",
  "name": "DescribeQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
	      "about": "The top level error code."},
    { "name": "Topics", "type": "[]DescribeQuorumTopicResponseTopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]DescribeQuorumPartitionResponsePartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "HighWatermark", "type": "int64", "versions": "0+"},
        { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "TargetVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
      ]}
    ]}],
  "commonStructs": [
    { "name": "ReplicaState", "versions": "0+", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+"},
      { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "about": "The last known log end offset of the follower or -1 if it is unknown"},
    ]}
  ]
}

DescribeQuorum Request Handling

...