Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Remove replica.selection.policy from consumer config

...

We use of the "rack.id" configuration to identify the location of the broker, so we will add the same configuration to the consumer. This will be used in Metadata requests as documented below.Fetching from a non-leader replica is an opt-in feature. We propose to add a second configuration "replica.selection.policy" to indicate whether the consumer should take the preferred replica selected by the broker or to use the leader as

usualIn this proposal, the consumer will always fetch from the preferred replica returned by the Metadata request regardless whether a `rack.id` is provided.

Configuration NameValid ValuesDefault Value
rack.id<nullable string>null
replica.selection.policyleader, preferredleader

Broker API 

We will expose a new plugin interface configured through the "replica.selector.class" configuration  that allows users to provide custom logic to determine the preferred replica to fetch from.

Configuration NameValid ValuesDefault Value
replica.selector.classclass name of ReplicaSelector implementationnull
Default Value
replica.selector.classclass name of ReplicaSelector implementationLeaderSelector

By default, Kafka will use an implementation which always returns the current partition leader (if one exists). This is for backwards compatibility.

The ReplicaSelector interface is provided below. We will pass client metadata include the rackId passed through the Metadata API and other connection-related information.

Code Block
class ClientMetadata {
  final String rackId;
  final String clientId;
  final InetAddress address;
  final KafkaPrincipal principal;
}  

interface ReplicaSelector extends Configurable, Closeable {
    /**
     * Select the replica to use for fetching the given partition.

interface ReplicaSelector extends Configurable, Closeable *{
 This method is invoked/**
 whenever a change to the* ISRSelect ofthe that
preferred replica a client should *use partitionfor is observedfetching.
     *
 If no replica  * Note thatis available, this ismethod onlyshould areturn recommendationnull. If the broker does
	 */
	Node select(ClientMetadata metadata, PartitionInfo partitionInfo);

     /** not
 support follower fetching, then the* consumerOptional willcustom fetchconfiguration
     */
  from the leader.
	 */
	Node select(ClientMetadata metadata, PartitionInfo partitionInfo);default void configure(Map<String, ?> configs) {}

    /**
     * Optional customshutdown configurationhook
     */
    	default void configure(Map<String, ?> configsclose() {}

    /**}


We will provide two implementations out of the box. One of these is the LeaderSelector mentioned above.

Code Block
class LeaderSelector implements ReplicaSelector {

  Node   * Optional shutdown hookselect(ClientMetadata metadata, PartitionInfo partitionInfo) {
     */
	default void close() {return partitionInfo.leader();
  }
}


We will provide a simple implementation out of the box which The second uses the rackId provided by the clients and implements exact matching on the rackId from replicas.

...

Code Block
MetadataResponse => ThrottleTimeMs Brokers ClusterId ControllerId [TopicMetadata]
  ThrottleTimeMs => INT32
  Brokers => [MetadataBroker]
  ClusterId => NULLABLE_STRING
  ControllerId => INT32
   
TopicMetadata => ErrorCode TopicName IsInternal [PartitionMetadata]
  ErrorCode => INT16
  TopicName => STRING
  IsInternal => BOOLEAN
   
PartitionMetadata => ErrorCode PartitionId Leader LeaderEpoch Replicas ISR OfflineReplicas
  ErrorCode => INT16
  PartitionId => INT32
  Leader => INT32
  LeaderEpoch => INT32
  PreferredReplicaPreferredReadReplica => INT32 // This is new
  Replicas => [INT32]
  ISR => [INT32]
  OfflineReplicas => [INT32]

...

  • Consumer Replica Selection: The first iteration of this KIP proposed to let the consumer decide which replica to fetch from using an interface similar to the one we have above. Ultimately we felt it was easier to reason about the system in a multi-tenant environment when replica selection logic could be controlled by the broker. It is much more difficult to have common replica selection logic through a client plugin since it involves coordinating dependencies and configuration across many applications. For the same reason, we also rejected the option to specify whether the preferred replica should be used by the client. 
  • Handling Old Fetch Versions: We have considered allowing the consumer to use older versions of the Fetch API. The only complication is that older versions will not allow us to use the more specific out of range error codes. This complicates out of range handling because we do not know whether we should expect the out of range offset to exist on the leader or not. If the offset is too small on the follower, it may still be in range on the leader. One option is to always do offset reconciliation on the replica that the consumer is fetching from. The downside of this is that we cannot detect situations when the consumer has seen a later offset than a replica due to stale propagation of the ISR state. 
  • Improved Log Start Offset Handling: We have mentioned the difficulty of reasoning about the consistency of the log start offset due to the fact that the retention is enforced independently by all replicas. One option we have considered is to only allow the leader to enforce retention and to rely only on the propagation of the log start offset in the Fetch response for follower deletion. In principle, this would give us the guarantee that the log start offset of a follower should always be less than or equal to that of the leader. However, unless we make log start offset acknowledgement a criteria for ISR membership, we cannot actually guarantee this. In the case of multiple leader elections occurring in a short time period, it is possible for this guarantee to be violated. Nevertheless, we think it may still be a good idea to rely on log start offset propagation for follower deletion, but we will consider this separately.