THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
The ReplicaSelector interface is provided below:. We will pass client metadata include the rackId passed through the Metadata API and other connection-related information.
Code Block |
---|
class ClientMetadata {
final String rackId;
final String clientId;
final InetAddress address;
final KafkaPrincipal principal;
}
|
Code Block |
interface ReplicaSelector extends Configurable, Closeable { /** * Select the replica to use for fetching the given partition. * This method is invoked whenever a change to the ISR of that * partition is observed. * * Note that this is only a recommendation. If the broker does * not support follower fetching, then the consumer will fetch * from the leader. */ Node select(StringClientMetadata rackIdmetadata, PartitionInfo partitionInfo); /** * Optional custom configuration */ default void configure(Map<String, ?> configs) {} /** * Optional shutdown hook */ default void close() {} } |
...
Code Block |
---|
class RackAwareReplicaSelector implements ReplicaSelector { Node select(StringClientMetadata rackIdmetadata, PartitionInfo partitionInfo) { // if rackId is not null, iterate through the online replicas // if one or more exists with matching rackId, choose a replica randomly from among them // otherwise return the current leader } } |
...