Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Revisions on cluster bootstrapping and basic test plan

...

Leader Progress Timeout

In the traditional push-based model, when a leader is disconnected from the quorum due to network partition, it will start a new election to learn the active quorum or form a new one immediately. In the pull-based model, however, say a new leader has been elected with a new epoch and everyone has learned about it except the old leader (e.g. that leader was not in the voters anymore and hence not receiving the BeginQuorumEpoch as well), then that old leader would not be notified by anyone about the new leader / epoch and become a pure "zombie leader", as there is no regular heartbeats being pushed from leader to the follower. This could lead to stale information being served to the observers and clients inside the cluster.

To resolve this issue, we will piggy-back on the "quorum.fetch.timeout.ms" config, such that if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election and start sending VoteRequest to voter nodes in the cluster to understand the latest quorum. If it couldn't connect to any known voter, the old leader shall keep starting new elections and bump the epoch. And if the returned response includes a newer epoch leader, this zombie leader would step down and becomes a follower. Note that the node will remain a candidate until it finds that it has been supplanted by another voter, or win the election eventually.

As we know from the Raft literature, this approach could generate disruptive voters when network partitions happen on the leader. The partitioned leader will keep increasing its epoch, and when it eventually reconnects to the quorum, it could win the election with a very large epoch number, thus reducing the quorum availability due to extra restoration time. Considering this scenario is rare, we would like to address it in a follow-up KIP.

...

Code Block
{
  "type": "data",
  "name": "LeaderChangeMessage",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      {"name": "ClusterId", "type": "string", "versions": "0+",
       "about" "Generated UUID for this cluster"},
	  {"name": "LeaderId", "type": "int32", "versions": "0+",
       "about": "The ID of the newly elected leader"},
      {"name": "VotedIds", "type": "[]int32", "versions": "0+",
       "about": "The IDs of the voters who voted for the current leader"},

  ]
}

The use of the clusterId field is described in more detail in the section on Cluster Bootstrapping. Essentially it is the responsibility of the first elected leader to initialize this value and for each subsequent leader to simply repeat it.

Also note that unlike other Kafka topic partition data whose log appends are persisted asynchronously, for this special quorum topic all log appends must be synced to FS before returning.

...

Code Block
{
  "apiKey": N,
  "type": "request",
  "name": "EndQuorumEpochRequest",
  "validVersions": "0",
  "fields": [
    { "name": "TopicsClusterId", "type": "[]EndQuorumTopicRequeststring", 
      "versions": "0+",
     "fieldsabout": [
"Generated UUID for this cluster"},
    { "name": "TopicNameTopics", "type": "string[]EndQuorumTopicRequest", 
      "versions": "0+", "fields": [
        { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]EndQuorumPartitionRequest", 
          "versions": "0+", "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },    
            {"name": "ReplicaId", "type": "int32", "versions": "0+",
            "about": "The ID of the replica sending this request"},
            {"name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The current leader ID or -1 if there is a vote in progress"},
            {"name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The current epoch"},
			 {"name": "PreferredSuccessors", "type": "[]int32", "versions": "0+",
		      "about": "A sorted list of preferred successors to start the election"}
          ]
       }
    }      
  ]
}

...

Code Block
{
  "apiKey": 1,
  "type": "request",
  "name": "FetchRequest",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    // ---------- Start new field ----------
    { "name": "ReplicaIdClusterId", "type": "int32string", "versions" "12+", "nullableVersions": "0+"12+", "default": "null", "taggedVersions": "12+", "tag": 1,
      "about": "The brokerclusterId IDif ofknown. theThis follower, of is used to validate metadata fetches prior to broker registration." },
    // ---------- End new field ----------
    { "name": "ReplicaId", "type": "int32", "versions": "0+",
      "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
    { "name": "MaxWaitTimeMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false,
      "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records" },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID." },
    { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": false,
      "about": "The fetch session epoch, which is used for ordering requests in a session" },
    { "name": "Topics", "type": "[]FetchableTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "FetchPartitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
          "about": "The current leader epoch of the partition." },
        { "name": "FetchOffset", "type": "int64", "versions": "0+",
          "about": "The message offset." },
		// ---------- Start new field ----------
        { "name": "FetchEpoch", "type": "int32", "versions": "12+", "default": "-1", "taggedVersions": "12+", "tag": 2,
          "about": "The epoch of the last replicated record"},
		// ---------- End new field ----------
        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": false,
          "about": "The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower."},
        { "name": "MaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." }
      ]}
    ]},
    { "name": "Forgotten", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false,
      "about": "In an incremental fetch request, the partitions to remove.", "fields": [
      { "name": "Name", "type": "string", "versions": "7+", "entityType": "topicName",
        "about": "The partition name." },
      { "name": "ForgottenPartitionIndexes", "type": "[]int32", "versions": "7+",
        "about": "The partitions indexes to forget." }
    ]},
    { "name": "RackId", "type":  "string", "versions": "11+", "default": "", "ignorable": true,
      "about": "Rack ID of the consumer making this request"}
  ]
}

...

Code Block
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partiitonpartition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The current log start offset." },
	    // ---------- Start new field ----------
        { "name": "NextOffsetAndEpoch", "type": "OffsetAndEpoch", 
          "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [
          { "name": "NextFetchOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should truncate to"},
          { "name": "NextFetchOffsetEpoch", "type": "int32", "versions": "0+",
            "about": "The epoch of the next offset in case the follower needs to truncate"},        
        ]},
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", 
          "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
        ]},
		// ---------- End new field ----------
        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "4+",
            "about": "The first offset in the aborted transaction." }
        ]},
        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
          "about": "The preferred read replica for the consumer to use on its next fetch request"},
        { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
          "about": "The record data." }
      ]}
    ]}
  ]
}

...

When a leader receives a FetchRequest, it must check the following:

  1. Check that the clusterId if not null matches the cached value in meta.properties.
  2. First ensure that the leader epoch is the same. If not, reject this request with either the FENCED_LEADER_EPOCH or UNKNOWN_LEADER_EPOCH error.
    1. If the leader epoch is smaller, then eventually this leader's BeginQuorumEpoch would reach the voter and that voter would update the epoch.
    2. If the leader epoch is larger, then eventually the receiver would learn about the new epoch anyways. Actually this case should not happen since, unlike the normal partition replication protocol, leaders are always the first to discover that they have been elected.
  3. Check that the epoch on the FetchOffset's  FetchEpoch are consistent with the leader's log. Specifically we check that FetchOffset is less than or equal to the end offset of FetchEpoch. If not, return OUT_OF_RANGE and encode the next FetchOffset as the last offset of the largest epoch which is less than or equal to the fetcher's epoch. This is a heuristic of truncating to let the voter truncate as much as possible to get to the starting-divergence point with fewer Fetch round-trips: if the fetcher's epoch is X which does not match the epoch of that fetching offset, then it means all records of epoch X on that voter may have diverged and hence could be truncated, then returning the next offset of largest epoch Y (< X) is reasonable.
  4. If the request is from a voter not an observer, the leader can possibly advance the high-watermark. As stated above, we only advance the high-watermark if the current leader has replicated at least one entry to majority of quorum to its current epoch. Otherwise, the high watermark is set to the maximum offset which has been replicated to a majority of the voters.

...

When the cluster is initialized for the first time, the voters will find each other through the static quorum.voters configuration. The first leader that is elected is responsible for writing the initial leader change message as specified above and generating a UUID to serve as the clusterId. This clusterId is propagated to the cluster through replication of the metadata log and cached in meta.properties as soon as the value is known to be committed.

Brokers that are restarted will validate the clusterId through a process similar to what is done today. Before attempting to fetch from the current leader, a broker will send a Metadata request for the metadata topic (specified by KIP-631) to one of the voters. The response will indicate the current leader (if there is one) and the current clusterId (which will be null before the initial leader election). The broker will validate this result against the value cached in meta.properties.

It is the job of the first elected leader (i.e. the first controller) to generate a UUID that will serve as a unique clusterId. We expect this to happen within the controller state machine that defined by KIP-631. This ID will be stored in the metadata log as a message and will be propagated to all brokers in the cluster through the replication protocol defined by this proposal. (From an implementation perspective, the Raft library will provide a hook for the initialization of the clusterId.)

As happens today, the clusterId replicated through the metadata log will be stored in meta.properties. When a broker is restarted, it will compare its cached clusterId with whatever ID is discovered dynamically. If the IDs do not match, then the broker shuts down. The purpose of this is to limit the impact of a misconfiguration which causes the broker to connect to the wrong cluster.

We would like to have the same protection once Zookeeper is gone, but it is more challenging since replication of the metadata log itself can be destructive. For example, a broker connecting to the wrong cluster may end up truncating its metadata log incorrectly through the course of replication. A rogue broker can even cause an invalid leader to get elected, which risks the loss of committed data.

The approach we take here to address this problem is to have the core Raft requests include a field for the clusterId. This is validated upon receipt and The invariant that we are trying to enforce is that no follower may fetch from a leader which has an inconsistent clusterId. However, we also need to protect leader election itself since there may not be a leader at the time that the broker is started. Hence we have added the clusterId as a field to the Vote and BeginQuorumEpoch requests. A voter will reject a request with the INVALID_CLUSTER_ID error code is returned if the requested clusterId does values do not match its own cached value. Note that . If the clusterId is only included in requests and validated if its value is known to have been committed. This protects from the case where the initial leader fails before its LeaderChange message can be committed to the log.

In summary, the initialization process for a voter will be the following:

  1. Send Metadata request to any of the current voters to find the current leader. 
    1. If a clusterId is received in a Metadata response, validate it against the current cached value if present. 
    2. If no clusterId is received and the broker has a non-null cached clusterId, then continue retrying the Metadata request until the election timeout expires.
  2. If the election timeout expires, become a candidate and send Vote requests including the current cached clusterId (or null if there is none).
    1. If the response indicates INVALID_CLUSTER_ID, the broker will not fail since the inconsistent broker might be in the minority. Instead it will continue retrying until the election completes.
  3. If at any time the broker receives a BeginQuorumEpoch request with an inconsistent clusterId, then the broker will terminate.

For an observer, it is even simpler. It would do the same validation in step 1, but in the case that no leader can be found, it would continue retrying indefinitely.

Tooling Support

We will add a new utility called kafka-metadata-quorum.sh to describe and alter quorum state. As usual, this tool will require --bootstrap-server to be provided.  We will support the following options:

Describing Current Status

not known (perhaps because the broker is being initialized for the first time), then it will specify a clusterId value of "null" and the destination broker will skip validation. This is not a 100% bulletproof solution because we have to allow this exemption for new brokers. It is still possible, for example, if a broker with empty state is started with the ID of an existing voter, which can lead to correctness violations. However, it addresses a large class of common misconfiguration cases and is no worse than what Kafka can protect against today.

Note that there is one subtlety to this proposal. If there is no active leader, which node should be trusted as the authoritative source for the clusterId? If we blindly shutdown the broker after any INVALID_CLUSTER_ID error, then a single broker could end up killing a majority of valid voters in the cluster. To address this issue, we only treat mismatched clusterId errors as fatal when the broker's value conflicts with that of an elected leader. So an INVALID_CLUSTER_ID error in a Vote request is simply treated as a vote rejection, but the same error from a Fetch is deemed fatal.

The exact initialization process for a voter when it starts up will be the following:

  1. Send a Fetch request to any of the current voters including the last known epoch from quorum-state and the clusterId from meta.properties (if there is one):
    1. If the response indicates INVALID_CLUSTER_ID, then the broker will shutdown.
    2. If no clusterId is received and the broker has a non-null cached clusterId, then continue retrying the Fetch request against a random voter until the election timeout expires.
  2. If the election timeout expires, become a candidate and send Vote requests including the current cached clusterId (or null if there is none).
    1. If the response indicates INVALID_CLUSTER_ID, the broker will not fail since the inconsistent broker might be in the minority. Instead it will continue retrying until the election completes.
  3. If at any time the broker receives a BeginQuorumEpoch request from an elected leader with an inconsistent clusterId, then the broker will terminate.
  4. If the broker receives a Vote request from one of the voters, the clusterId is not null, and the value not match the cached value, then reject the vote and return INVALID_CLUSTER_ID

For an observer, it is even simpler. It would do the same validation in step 1, but in the case that no leader can be found, it would continue retrying indefinitely. If the observer has no clusterId, then it will send Fetch requests with a null clusterId. Once a leader is elected and the clusterId message has become committed, then the observer will update meta.properties and begin including the clusterId in all future Fetch requests.

Tooling Support

We will add a new utility called kafka-metadata-quorum.sh to describe and alter quorum state. As usual, this tool will require --bootstrap-server to be provided.  We will support the following options:

Describing Current Status

There will be two options available with -There will be two options available with --describe: 

  • --describe status: a short summary of the quorum status and the other provides detailed information about the status of replication.
  • --describe replication: provides detailed information about the status of replication

...

  • Brokers can directly update ZK for shrinking / expanding ISR; this will be replaced with AlterISR request sent from leaders to the controller (KIP-497: Add inter-broker API to alter ISR). The controller would then update the metadata by appending a batch of entries to its metadata log where each topic-partition represents one entry.
  • Admin requests for reassign replicas will be replaced with an AlterPartitionAssignments request to the controller (KIP-455: Create an Administrative API for Replica Reassignment). The controller would update the metadata by appending a batch of entries to its metadata log where each topic-partition represents one entry.
  • Existing admin request for config changes etc will be translated to a batch of updates to its metadata log.

Log Compaction and Snapshots

The metadata log described in this KIP can grow unbounded. As discussed in the introduction, our approach to managing the size of the replicated log is described in a separate proposal: KIP-630: Kafka Raft Snapshot. Each broker in KIP-500 will be an observer of the metadata log and will materialize the entries into a cache of some kind. New and slow brokers will catch up to the leader by 1) fetching the snapshot and 2) fetching the replicated log after the snapshot. This will provide a stronger guarantee on the consistency of the metadata than is possible today.

Quorum Performance

  • will be replaced with AlterISR request sent from leaders to the controller (KIP-497: Add inter-broker API to alter ISR). The controller would then update the metadata by appending a batch of entries to its metadata log where each topic-partition represents one entry.
  • Admin requests for reassign replicas will be replaced with an AlterPartitionAssignments request to the controller (KIP-455: Create an Administrative API for Replica Reassignment). The controller would update the metadata by appending a batch of entries to its metadata log where each topic-partition represents one entry.
  • Existing admin request for config changes etc will be translated to a batch of updates to its metadata log.

Log Compaction and Snapshots

The metadata log described in this KIP can grow unbounded. As discussed in the introduction, our approach to managing the size of the replicated log is described in a separate proposal: KIP-630: Kafka Raft Snapshot. Each broker in KIP-500 will be an observer of the metadata log and will materialize the entries into a cache of some kind. New and slow brokers will catch up to the leader by 1) fetching the snapshot and 2) fetching the replicated log after the snapshot. This will provide a stronger guarantee on the consistency of the metadata than is possible today.

Quorum Performance

The goal for Raft quorum is to replace Zookeeper dependency and reach higher performance for metadata operations. In the first version, we will be building necessary metrics to monitor the end-to-end latency from admin request (AlterPartitionReassignments) and client request being accepted to being committed. We shall monitor the time spent on local, primarily the time to fsync the new records and time to apply changes to the state machine, which may not be really a trivial operation. Besides we shall also monitor the time used to propagate change on the remote, I.E. latency to advance the high watermark. Benchmarks will also be built to compare the efficiency for a 3-node broker cluster using Zookeeper vs Raft, under heavy load of metadata changes. We shall also be exploring existing distributed consensus system load frameworks at the same time, but this may beyond the scope of KIP-595. 

Test Plan

Raft is a well-studied consensus protocol, but there are some notable differences from the classic protocol in this proposal. As we did for the Kafka replication protocol in KIP-320, we will use model checking to validate the replication semantics of this protocol. (Note that validation of the controller state machine will be handled as part of KIP-631.)

Our primary method for testing the implementation is through Discrete Event Simulation (DES). Our prototype implementation already includes the basic framework. DES allows us to test a large number of deterministically generated random scenarios which include various kinds of faults (such as network partitions). It allows us to define system invariants programmatically which are then checked after each step in the simulation. This has already been extremely useful identifying bugs throughout development of the prototype.

Other than that, we will use the typical suite of unit/integration/system tests The goal for Raft quorum is to replace Zookeeper dependency and reach higher performance for metadata operations. In the first version, we will be building necessary metrics to monitor the end-to-end latency from admin request (AlterPartitionReassignments) and client request being accepted to being committed. We shall monitor the time spent on local, primarily the time to fsync the new records and time to apply changes to the state machine, which may not be really a trivial operation. Besides we shall also monitor the time used to propagate change on the remote, I.E. latency to advance the high watermark. Benchmarks will also be built to compare the efficiency for a 3-node broker cluster using Zookeeper vs Raft, under heavy load of metadata changes. We shall also be exploring existing distributed consensus system load frameworks at the same time, but this may beyond the scope of KIP-595. 

Rejected Alternatives

Use an existing Raft library: Log replication is at the core of Kafka and the project should own it. Dependence on a third-party system or library would defeat one of the central motivations for KIP-500. There would be no easy way to evolve a third-party component according to the specific needs of Kafka. For example, we may eventually use this protocol for partition-level replication, but it would make compatibility much more difficult if we cannot continue to control the log layer. So if we must control both the log layer and the RPC protocol, then the benefit of a third-party library is marginal and the cost is an unnecessary constraint on future evolution. Furthermore, Raft libraries typically bring in their own RPC mechanism, serialization formats, have their own monitoring, logging, etc. All of this requires additional configuration the user needs to understand.

...