Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note, performance tests are being done to show the improvements with the proposed changes. Results to follow.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

...

Proposed Changes

This KIP proposes to make new leader information available in the ProduceResponse and FetchResponse when the CurrentLeaderEpoch in the request is less than the leader epoch known by the receiving broker. This should help us eliminate the metadata refresh from the critical path of the Produce and Fetch requests, which tend to be very latency-sensitive. This is an optimization mentioned in KIP-595, which uses a similar technique but only for the metadata log and the Fetch RPC.

Following sections will consider the situations in which leadership changes. This could happen due to a controlled shutdown, replica reassignment, or preferred leader election.

Broker

When the leader of a partition changes from the old leader to a new leader, the old leader can inform the client of the new leader’s LeaderEpoch & LeaderId via ProduceResponse and FetchResponse. Notably, this information is sent together with the existing error codes of NOT_LEADER_OR_FOLLOWER and FENCED_LEADER_EPOCH. The new leader information is obtained from either the replica partition state (if the receiving broker continues to be a replica of the partition) or from the broker’s metadata cache (if the receiving broker is not a replica for the partition because of reassignment).

Client

The client will only accept the new leader information(LeaderId & LeaderEpoch) only if it advances its view of the new leader(i.e. new-leader’s Epoch should be greater than what client knows already) and use it in subsequent retries of the Produce & Fetch requests. On the client, it can happen that the subsequent metadata refreshes return stale leader information, if the latest metadata isn’t yet fully propagated to the entire cluster. The client will make sure that new leader information isn’t overridden by the stale leader’s information(again comparing LeaderEpochs). The Kafka Java client does this already in Metadata::updateLatestMetadata.

Even though client will receive the new leader information in the ProduceResponse & FetchResponse when leader changes, but same as the existing behaviour of the Kafka Java client, it will request expedited metadata-refresh done asynchronously. Since leadership change will likely affect many partitions, so future requests to such partitions will benefit from this.

For Produce, client would no longer back off up to RETRY_BACKOFF_MS_CONFIG, before retrying the failed batch if new leader information that advances clients' metadata state was received in the ProduceResponse. This immediate retry is appealing as the client is going to retry on a different broker and it is likely to succeed because it is retrying on a newer leader. On the other hand, subsequent retries to the same new LeaderEpoch should still continue to be subject to clients' backoff strategy.

In the ProduceResponse & FetchResponse, new leader’s connection parameters (host & port) will also be supplied along with LeaderId & LeaderEpoch, upon returning errors NOT_LEADER_OR_FOLLOWER and FENCED_LEADER_EPOCH . This will be useful in situations when new leader's connection parameters are not part of client's metadata cache or stale. This can happen in a scenario where an existing broker restarts, or a new broker is added to the cluster. For instance when existing broker restarts, any subsequent metadata refresh on the client, will remove the connection parameters from its cache while broker is shutdown. After broker is restarted, and this broker becomes the new leader, client will require connection parameters along with LeaderId and LeaderEpoch to connect to this new leader in a subsequent retry of the failed produce or fetch request. To this effect, LeaderHost and LeaderPort are included in the responses. Additionally LeaderRack is returned to be consistent with the broker information returned in MetadataResponse. This will simply override the previously stored broker information in the metadata cached locally.

Public Interfaces


Compatibility, Deprecation, and Migration Plan

...