...
The ReplicaSelector interface is provided below. We will pass client metadata include the rackId passed through the Metadata API and other connection-related information.
Code Block |
---|
classinterface ClientMetadata { final String rackId(); final String clientId(); final InetAddress addressclientAddress(); final KafkaPrincipal principal(); String listenerName(); } interface ReplicaView { Node endpoint(); long logEndOffset(); long timeSinceLastCaughtUpMs(); } interface PartitionView { Set<ReplicaView> replicas(); Optional<ReplicaView> leader(); } interface ReplicaSelector extends Configurable, Closeable { /** * Select the preferred replica a client should use for fetching. * If no replica is available, this method should return an empty nulloptional. */ NodeOptional<ReplicaView> select(TopicPartition topicPartition, ClientMetadata metadata, PartitionInfoPartitionView partitionInfopartitionView); /** * Optional custom configuration */ default void configure(Map<String, ?> configs) {} /** * Optional shutdown hook */ default void close() {} } |
...
Code Block |
---|
class LeaderSelector implements ReplicaSelector { NodeOptional<ReplicaView> select(TopicPartition topicPartition, ClientMetadata metadata, PartitionInfoPartitionView partitionInfopartitionView) { return partitionInfopartitionView.leader(); } } |
The second uses the rackId provided by the clients and implements exact matching on the rackId from replicas.
Code Block |
---|
class RackAwareReplicaSelector implements ReplicaSelector { NodeOptional<ReplicaView> select(TopicPartition topicPartition, ClientMetadata metadata, PartitionInfoPartitionView partitionInfopartitionView) { // if rackId is not null, iterate through the online replicas // if one or more exists with matching rackId, choose a the most caught-up replica randomly from among them // otherwise return the current leader } } |
...
When an OffsetsForLeaderEpoch request is received from a consumer, the returned offset will be limited to the high watermark. As with the Fetch API, when a leader has just been elected, the true high watermark is not known for a short time. If an OffsetsForLeaderEpoch request is received with the latest epoch during this window, the leader will respond with the OFFSET_NOT_AVAILABLE error code. This will cause the consumer to retry. Note as well that only leaders are allowed to handle OffsetsForLeaderEpoch queries.
A new JMX metric will be added to the Java client when it has a preferred read replica. Under the "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=*,partition=*" object, a new attribute "preferred-read-replica" will be added. This metric will have a value equal to the broker ID which the consumer is currently fetching from. If the attribute is missing or set to -1, it means the consumer is fetching from the leader.
Compatibility, Deprecation, and Migration Plan
...