...
We use of the "rack.id" configuration to identify the location of the broker, so we will add the same configuration to the consumer. This will be used in Metadata requests as documented below.Fetching from a non-leader replica is an opt-in feature. We propose to add a second configuration "replica.selection.policy" to indicate whether the consumer should take the preferred replica selected by the broker or to use the leader as
usualIn this proposal, the consumer will always fetch from the preferred replica returned by the Metadata request regardless whether a `rack.id` is provided.
Configuration Name | Valid Values | Default Value |
---|---|---|
rack.id | <nullable string> | null |
replica.selection.policy | leader, preferred | leader |
Broker API
We will expose a new plugin interface configured through the "replica.selector.class" configuration that allows users to provide custom logic to determine the preferred replica to fetch from.
Configuration Name | Valid Values | Default Value |
---|---|---|
replica.selector.class | class name of ReplicaSelector implementation | null |
Default Value | ||
replica.selector.class | class name of ReplicaSelector implementation | LeaderSelector |
By default, Kafka will use an implementation which always returns the current partition leader (if one exists). This is for backwards compatibility.
The ReplicaSelector interface is provided below. We will pass client metadata include the rackId passed through the Metadata API and other connection-related information.
Code Block |
---|
class ClientMetadata { final String rackId; final String clientId; final InetAddress address; final KafkaPrincipal principal; } interface ReplicaSelector extends Configurable, Closeable { /** * Select the replica to use for fetching the given partition. interface ReplicaSelector extends Configurable, Closeable *{ This method is invoked/** whenever a change to the* ISRSelect ofthe that preferred replica a client should *use partitionfor is observedfetching. * If no replica * Note thatis available, this ismethod onlyshould areturn recommendationnull. If the broker does */ Node select(ClientMetadata metadata, PartitionInfo partitionInfo); /** not support follower fetching, then the* consumerOptional willcustom fetchconfiguration */ from the leader. */ Node select(ClientMetadata metadata, PartitionInfo partitionInfo);default void configure(Map<String, ?> configs) {} /** * Optional customshutdown configurationhook */ default void configure(Map<String, ?> configsclose() {} /**} |
We will provide two implementations out of the box. One of these is the LeaderSelector mentioned above.
Code Block |
---|
class LeaderSelector implements ReplicaSelector { Node * Optional shutdown hookselect(ClientMetadata metadata, PartitionInfo partitionInfo) { */ default void close() {return partitionInfo.leader(); } } |
We will provide a simple implementation out of the box which The second uses the rackId provided by the clients and implements exact matching on the rackId from replicas.
...
Code Block |
---|
MetadataResponse => ThrottleTimeMs Brokers ClusterId ControllerId [TopicMetadata] ThrottleTimeMs => INT32 Brokers => [MetadataBroker] ClusterId => NULLABLE_STRING ControllerId => INT32 TopicMetadata => ErrorCode TopicName IsInternal [PartitionMetadata] ErrorCode => INT16 TopicName => STRING IsInternal => BOOLEAN PartitionMetadata => ErrorCode PartitionId Leader LeaderEpoch Replicas ISR OfflineReplicas ErrorCode => INT16 PartitionId => INT32 Leader => INT32 LeaderEpoch => INT32 PreferredReplicaPreferredReadReplica => INT32 // This is new Replicas => [INT32] ISR => [INT32] OfflineReplicas => [INT32] |
...
- Consumer Replica Selection: The first iteration of this KIP proposed to let the consumer decide which replica to fetch from using an interface similar to the one we have above. Ultimately we felt it was easier to reason about the system in a multi-tenant environment when replica selection logic could be controlled by the broker. It is much more difficult to have common replica selection logic through a client plugin since it involves coordinating dependencies and configuration across many applications. For the same reason, we also rejected the option to specify whether the preferred replica should be used by the client.
- Handling Old Fetch Versions: We have considered allowing the consumer to use older versions of the Fetch API. The only complication is that older versions will not allow us to use the more specific out of range error codes. This complicates out of range handling because we do not know whether we should expect the out of range offset to exist on the leader or not. If the offset is too small on the follower, it may still be in range on the leader. One option is to always do offset reconciliation on the replica that the consumer is fetching from. The downside of this is that we cannot detect situations when the consumer has seen a later offset than a replica due to stale propagation of the ISR state.
- Improved Log Start Offset Handling: We have mentioned the difficulty of reasoning about the consistency of the log start offset due to the fact that the retention is enforced independently by all replicas. One option we have considered is to only allow the leader to enforce retention and to rely only on the propagation of the log start offset in the Fetch response for follower deletion. In principle, this would give us the guarantee that the log start offset of a follower should always be less than or equal to that of the leader. However, unless we make log start offset acknowledgement a criteria for ISR membership, we cannot actually guarantee this. In the case of multiple leader elections occurring in a short time period, it is possible for this guarantee to be violated. Nevertheless, we think it may still be a good idea to rely on log start offset propagation for follower deletion, but we will consider this separately.