Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under Discussion Accepted

Discussion thread:

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8443

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The ReplicaSelector interface is provided below. We will pass client metadata include the rackId passed through the Metadata API and other connection-related information.

Code Block
classinterface ClientMetadata {
  final String rackId();
  final String clientId();
  final InetAddress addressclientAddress();
  final KafkaPrincipal principal();
}  String listenerName();
}

interface ReplicaSelectorReplicaView extends{
 Configurable, Closeable {Node endpoint();
    /**long logEndOffset();
  long timeSinceLastCaughtUpMs();
}

interface PartitionView *{
 Select  Set<ReplicaView> replicas();
  Optional<ReplicaView> leader();
}

interface ReplicaSelector extends Configurable, Closeable {
    /**
     * Select the preferred replica a client should use for fetching.
     * If no replica is available, this method should return nullan empty optional.
	 */
	NodeOptional<ReplicaView> select(TopicPartition topicPartition, ClientMetadata metadata, PartitionInfoPartitionView partitionInfopartitionView);

    /**
     * Optional custom configuration
     */
    default void configure(Map<String, ?> configs) {}

    /**
     * Optional shutdown hook
     */
	default void close() {}
}

...

Code Block
class LeaderSelector implements ReplicaSelector {

  NodeOptional<ReplicaView> select(TopicPartition topicPartition, ClientMetadata metadata, PartitionInfoPartitionView partitionInfopartitionView) {
    return partitionInfopartitionView.leader();
  }
}


The second uses the rackId provided by the clients and implements exact matching on the rackId from replicas.

Code Block
class RackAwareReplicaSelector implements ReplicaSelector {

  NodeOptional<ReplicaView> select(TopicPartition topicPartition, ClientMetadata metadata, PartitionInfoPartitionView partitionInfopartitionView) {
    // if rackId is not null, iterate through the online replicas 
    // if one or more exists with matching rackId, choose a the most caught-up replica randomly from among them
    // otherwise return the current leader
  }
}

...

When an OffsetsForLeaderEpoch request is received from a consumer, the returned offset will be limited to the high watermark. As with the Fetch API, when a leader has just been elected, the true high watermark is not known for a short time. If an OffsetsForLeaderEpoch request is received with the latest epoch during this window, the leader will respond with the OFFSET_NOT_AVAILABLE error code. This will cause the consumer to retry. Note as well that only leaders are allowed to handle OffsetsForLeaderEpoch queries. 

A new JMX metric will be added to the Java client when it has a preferred read replica. Under the "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=*,partition=*" object, a new attribute "preferred-read-replica" will be added. This metric will have a value equal to the broker ID which the consumer is currently fetching from. If the attribute is missing or set to -1, it means the consumer is fetching from the leader.

Compatibility, Deprecation, and Migration Plan

...