Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add ClientMetadata to encapsulate client state in replica selector

...

The ReplicaSelector interface is provided below:. We will pass client metadata include the rackId passed through the Metadata API and other connection-related information.

Code Block
class ClientMetadata {
  final String rackId;
  final String clientId;
  final InetAddress address;
  final KafkaPrincipal principal;
}  

Code Block
interface ReplicaSelector extends Configurable, Closeable {
    /**
     * Select the replica to use for fetching the given partition.
     * This method is invoked whenever a change to the ISR of that
     * partition is observed.
     *
     * Note that this is only a recommendation. If the broker does
     * not support follower fetching, then the consumer will fetch
     * from the leader.
	 */
	Node select(StringClientMetadata rackIdmetadata, PartitionInfo partitionInfo);

    /**
     * Optional custom configuration
     */
    default void configure(Map<String, ?> configs) {}

    /**
     * Optional shutdown hook
     */
	default void close() {}
}

...

Code Block
class RackAwareReplicaSelector implements ReplicaSelector {

  Node select(StringClientMetadata rackIdmetadata, PartitionInfo partitionInfo) {
    // if rackId is not null, iterate through the online replicas 
    // if one or more exists with matching rackId, choose a replica randomly from among them
    // otherwise return the current leader
  }
}

...