Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Remove lastCaughtUpTimeMs

...

  • bootstrap.servers:  Defines the set of servers to contact to get quorum information. This does not have to address quorum members directly. For example, it could be a VIP.
  • bootstrap.quorum.voters: Defines the ids of the expected voters. This is only required when bootstrapping the cluster for the first time. As long as the cluster (hence the quorum) has started then new brokers would rely on DiscoverBrokers (described below) to discover the current voters of the quorum. 
  • quorum.fetch.timeout.ms: Maximum time without a successful fetch from the current leader before a new election is started.
  • quorum.election.timeout.ms: Maximum time without collected a majority of votes during the candidate state before a new election is retried.
  • quorum.election.backoff.max.ms: Maximum exponential backoff time (based on the number if retries) after an election timeout, before a new election is triggered.
  • quorum.request.timeout.ms: Maximum time before a pending request is considered failed and the connection is dropped.
  • quorum.retry.backoff.ms: Initial delay between request retries.
  • quorum.retry.backoff.max.ms: Max delay between requests. Backoff will increase exponentially beginning from quorum.retry.backoff.ms (the same as in KIP-580).
  • broker.id: The existing broker id config shall be used as the voter id in the Raft quorum.

...

This section discusses the process for establishing the quorum when the cluster is first initialized and afterwards when nodes are restarted.  

When first initialized, we rely on the quorum.voters configuration to define the expected voters. Brokers will only rely on this when starting up if there is no quorum state written to the log and if the broker fails to discover an existing quorum through the DiscoverBrokers API. We suggest that a --bootstrap flag could be passed through kafka-server-start.sh when the cluster is first started to add some additional safety. In that case, we would only use the config when the flag is specified and otherwise expect to discover the quorum dynamically.

There are essentially two problems when a broker starts up:

  1. How to find the connection information of the other brokers and to advertise our own connection information
  2. How to find the current status of the quorum (i.e. which node is the leader and who are the other voters)

As with Kafka today, we assume that each broker has an advertised listener for inter-broker communications. A key question in this design concerns the permanence of the advertised listener. When a broker is restarted, is it safe to assume that it will re-register with the same advertised listener or not? Kafka today sets no requirement on this; a broker can change its advertised listener every time it restarts. This can be easily supported because the Zookeeper connection information does not change and the broker can use Zookeeper for broker discovery. Without Zookeeper to lean on, we must either have stricter constraints on the mutability of the advertised listener, or we must have a dynamic mechanism to discover changes.

To illustrate the problem, consider the case when all brokers are shutdown and then restarted. When the first brokers are starting up, there will not be a sufficient number of them to form a majority and elect a leader for the metadata log. We cannot rely on the metadata log as we can with Zookeeper in this case to be able to register. At a minimum, the current voters need some way to find each other in order to elect a leader. If we are allowed to assume that the advertised listeners do not change, then each voter's advertised listener could be written into the `quorum-state` file for example. However, if we cannot assume a static advertised listener, then we need a different approach.

A second case occurs when new brokers are added to the cluster. The original voter configuration specified by `bootstrap.quorum.voters` may no longer be relevant since the voters may have been changed by a dynamic reassignment. So how should the new brokers discover where the current voters are and how to connect to them?

The approach we take here is intended to give the most flexibility to different environments. We have added a `bootstrap.servers` configuration, which is used in a similar way to the same configuration in clients. When a broker first starts up, it will attempt to connect to one of the endpoints specified by this configuration in order to discover other brokers and the quorum. The endpoints defined by this config could point to a load balancer or a VIP; they could also point to an explicit set of brokers. Once the broker has connected, it will use a new `AdvertiseBroker` API in order to install its endpoint information and make itself reachable to other nodes. The same API is used to discover other nodes in the cluster.

Once connected to the cluster, the broker will send a `Fetch` to any one of the existing brokers in order to find the current leader.

  1. The broker cannot find connection information for any of the voters. 



When first initialized, we rely on the bootstrap.quorum.voters configuration to define the expected voters. Brokers will only rely on this when starting up if there is no quorum state written to the log and if the broker fails to discover an existing quorum through the DiscoverBrokers API. We suggest that a --bootstrap flag could be passed through kafka-server-start.sh when the cluster is first started to add some additional safety. In that case, we would only use the config when the flag is specified and otherwise expect to discover the quorum dynamically.

Assuming the broker fails to discover an existing quorumAssuming the broker fails to discover an existing quorum, it will then check its broker.id to see if it is expected to be one of the initial voters. If so, then it will immediately include itself in DiscoverBrokers responses. Until the quorum is established, brokers will send DiscoverBrokers to the nodes in bootstrap.servers and other discovered voters in order to find the other expected members. Once enough brokers are known, the brokers will begin a vote to elect the first leader.

...

  1. Upon starting up, brokers always try to bootstrap its knowledge of the quorum by first reading the quorum-state file and then scanning forward from AppliedOffset to the end of the log to see if there are any changes to the quorum state. For newly started brokers, the log / file would all be empty so no previous knowledge can be restored.
  2. If after step 1), there's some known quorum state along with a leader / epoch already, the broker would:
    1. Promote itself from observer to voter if it finds out that it's a voter for the epoch.
    2. Start sending Fetch request to the current leader it knows (it may not be the latest epoch's leader actually).
  3. Otherwise, it will try to learn the quorum state by sending DiscoverBrokers to any other brokers inside the cluster via boostrap.servers as the second option of quorum state discovery.
    1. As long as a broker does not know all the current quorum voter's connections, it should continue periodically ask other brokers via DiscoverBrokers.
  4. Send out MetadataRequest to the discovered brokers to find the current metadata partition leader.
    1. As long as a broker does not know the current quorum (including the leader and the voters), it should continue periodically ask other brokersvia brokers via Metadata.
  5. If even step 3) 4) cannot find any quorum information – e.g. when there's no other brokers in the cluster, or there's a network partition preventing this broker to talk to others in the cluster – fallback to the third option of quorum state discover by checking if it is among the brokers listed in quorum.voters.
    1. If so, then it will promote to voter state and add its own connection information to the cached quorum state and return that in the DiscoverBrokers responses it answers to other brokers; otherwise stays in observer state.
    2. In either case, it continues to try to send DiscoverBrokers to all other brokers in the cluster via boostrap.servers.
  6. For any voter, after it has learned a majority number of voters in the expected quorum from DiscoverBrokers responses, it will begin a vote.

...

Code Block
{
  "apiKey": 56,
  "type": "response",
  "name": "DescribeQuorumResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "type": "[]DescribeQuorumTopicResponse",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]DescribeQuorumPartitionResponse",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "HighWatermark", "type": "int64", "versions": "0+"},
        { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "TargetVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
      ]}
    ]}],
  "commonStructs": [
    { "name": "ReplicaState", "versions": "0+", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+"},
      { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "about": "The last known log end offset of the follower or -1 if it is unknown"},
      { "name": "LastCaughtUpTimeMs", "type": "int64", "versions": "0+",
        "about": "The last time the replica was caught up to the high watermark"}
    ]}
  ]
}

DescribeQuorum Request Handling

...

Code Block
languagejava
titleKafkaAdminClient.java
public clase QuorumInfo {
	
	Map<Integer, VoterDescription> currentVoters();

	Map<Integer, VoterDescription> targetVoters();

	Map<Integer, VoterDescription> observers();

	Optional<Integer> leaderId();

    int leaderEpoch();

	long highWatermark();
 
	public static class VoterDescription {
		Optional<Long> logEndOffset();
        Optional<Long> lastCaughtUpTimeMs();
	}
}

The epoch and offsets are set as optional because the leader may not actively maintain all the observers' status.

...

This is outside the scope of this proposal, but we suggest that this approach to versioning may be useful for managing metadata consistency on the clients. Each metadata response can indicate the corresponding version of the metadata log so that clients have an easy way to detect stale metadata. Additionally, we provide a separate lastCaughtUpTimeMs field in the Fetch response which can be useful for an observer to detect how well it is keeping up with the replication. If lastCaughtUpTimeMs begins to grow, then an observer may decide to stop serve metadata requests from clients.

Quorum Performance

The goal for Raft quorum is to replace Zookeeper dependency and reach higher performance for metadata operations. In the first version, we will be building necessary metrics to monitor the end-to-end latency from admin request (AlterPartitionReassignments) and client request being accepted to being committed. We shall monitor the time spent on local, primarily the time to fsync the new records and time to apply changes to the state machine, which may not be really a trivial operation. Besides we shall also monitor the time used to propagate change on the remote, I.E. latency to advance the high watermark. Benchmarks will also be built to compare the efficiency for a 3-node broker cluster using Zookeeper vs Raft, under heavy load of metadata changes. We shall also be exploring existing distributed consensus system load frameworks at the same time, but this may beyond the scope of KIP-595. 

...