Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The ReplicaSelector interface is provided below. We will pass client metadata include the rackId passed through the Metadata API and other connection-related information.

Code Block
classinterface ClientMetadata {
  final String rackId();
  final String clientId();
  final InetAddress addressclientAddress();
  final KafkaPrincipal principal();
  String listenerName();
}

interface ReplicaView {
  Node endpoint();
  long logEndOffset();
  long timeSinceLastCaughtUpMs();
}

interface PartitionView {
  Set<ReplicaView> replicas();
  Optional<ReplicaView> leader();
}

interface ReplicaSelector extends Configurable, Closeable {
    /**
     * Select the preferred replica a client should use for fetching.
     * If no replica is available, this method should return an empty nulloptional.
	 */
	NodeOptional<ReplicaView> select(TopicPartition topicPartition, ClientMetadata metadata, PartitionInfoPartitionView partitionInfopartitionView);

    /**
     * Optional custom configuration
     */
    default void configure(Map<String, ?> configs) {}

    /**
     * Optional shutdown hook
     */
	default void close() {}
}

...

Code Block
class LeaderSelector implements ReplicaSelector {

  NodeOptional<ReplicaView> select(TopicPartition topicPartition, ClientMetadata metadata, PartitionInfoPartitionView partitionInfopartitionView) {
    return partitionInfopartitionView.leader();
  }
}


The second uses the rackId provided by the clients and implements exact matching on the rackId from replicas.

Code Block
class RackAwareReplicaSelector implements ReplicaSelector {

  NodeOptional<ReplicaView> select(TopicPartition topicPartition, ClientMetadata metadata, PartitionInfoPartitionView partitionInfopartitionView) {
    // if rackId is not null, iterate through the online replicas 
    // if one or more exists with matching rackId, choose a the most caught-up replica randomly from among them
    // otherwise return the current leader
  }
}

...

When an OffsetsForLeaderEpoch request is received from a consumer, the returned offset will be limited to the high watermark. As with the Fetch API, when a leader has just been elected, the true high watermark is not known for a short time. If an OffsetsForLeaderEpoch request is received with the latest epoch during this window, the leader will respond with the OFFSET_NOT_AVAILABLE error code. This will cause the consumer to retry. Note as well that only leaders are allowed to handle OffsetsForLeaderEpoch queries. 

A new JMX metric will be added to the Java client when it has a preferred read replica. Under the "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=*,partition=*" object, a new attribute "preferred-read-replica" will be added. This metric will have a value equal to the broker ID which the consumer is currently fetching from. If the attribute is missing or set to -1, it means the consumer is fetching from the leader.

Compatibility, Deprecation, and Migration Plan

...