Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under Discussion Accepted

Discussion thread:

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8443

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

In order to support fetching from the closest replica, we need 1) to add support in the Fetch protocol for follower fetching, and 2) some way for the consumer to determine its distance to the replicas of a partitiona mechanism to find the "closest" replica to a given consumer

Follower Fetching

We propose to extend the fetch protocol to allow consumer fetching from any replica. Just as when fetching from the leader, the replication protocol will guarantee that only committed data is returned to the consumer. This relies on propagation of the high watermark through the leader's fetch response. Due to this propagation delay, follower fetching may have higher latency. Additionally, follower fetching introduces the possibility of spurious out of range errors if a fetch is received by an out-of-sync replica. The diagram below illustrates the different cases that are possible.

...

Case 2 (unavailable offset): The offset is not available locally, but is known to be committed. This can occur if a fetch is received by an out-of-sync replica. Even out-of-sync replicas will continue fetching from the leader and they may have received a high watermark value which is larger than its log end offset. So a fetch at an offset between the log end offset and the high watermark received from the leader is known to be committed though the replica does not have the data. 

In generalIdeally, consumers should fetch only from the ISR, but it is not possible to prevent fetching from outside the ISR because it takes time for the ISR state to propagate to each replica. We propose here to also return OFFSET_NOT_AVAILABLE for this case, which will cause the consumer to retry. In the common case, we expect the out-of-sync replica to return to the ISR after a short time. On the other hand, if the replica remains out-of-sync for a longer duration, the consumer will have a chance to find a new replica after its metadata refreshsome time. We discuss this point in more detail below in the section on how the consumer finds its preferred replica.

...

In fact, the logic to handle this case is already outlined in KIP-320. The consumer will reconcile its current offset with the leader and then will resume fetching. The only change we make here is to request a metadata update if we find that the current offset is still valid on the leader. This gives us a chance to detect any ISR changes before we resume This gives us a chance to detect any ISR changes before we resume fetching from the same replica.

...

The benefit of letting the broker decide which replica is preferred for a client is that it can take load into account. For example, the broker can round robin between nearby replicas in order to spread the load. There are many such considerations that are possible, so we we propose to allow users to provide a ReplicaSelector plugin to the broker in order to handle the logic. This will be exposed to the broker with the replica.selector.class configuration. In order to make use of this plugin, we will extend the Metadata Fetch API so that clients can provide their own location information. In the response, the broker will indicate a preferred replica to fetch from.

...

Consumer API 

We use of the "broker.rack.id" configuration to identify the location of the broker, so we will add the same a similar configuration to the consumer. This will be used in Metadata Fetch requests as documented below.

Fetching from a non-leader replica is an opt-in feature. We propose to add a second configuration "replica.selection.policy" to indicate whether the "

In this proposal, the consumer will always fetch from the preferred replica returned by the Metadata request regardless whether a `rack.id` is provided.

Configuration
Configuration
NameValid ValuesDefault Value
client.rack
.id
<nullable string>null
replica.selection.policyleader, preferredleader

Broker API 

We will expose a new plugin interface configured through the "replica.selector.class" configuration  that allows users to provide custom logic to determine the preferred replica to fetch from.

Configuration NameValid ValuesDefault Value
replica.selector.classclass name of ReplicaSelector implementation
null
LeaderSelector

By default, Kafka will use an implementation which always returns the current partition leader (if one exists). This is for backwards compatibility.

The ReplicaSelector interface is provided below:. We will pass client metadata include the rackId passed through the Metadata API and other connection-related information.

Code Block
interface ReplicaSelectorClientMetadata extends{
 Configurable, Closeable {String rackId();
    /**String clientId();
  InetAddress clientAddress();
  * Select the replica to use for fetching the given partition.
     * This method is invoked whenever a change to the ISR of that
     * partition is observed.
     KafkaPrincipal principal();
  String listenerName();
}

interface ReplicaView {
  Node endpoint();
  long logEndOffset();
  long timeSinceLastCaughtUpMs();
}

interface PartitionView {
  Set<ReplicaView> replicas();
  Optional<ReplicaView> leader();
}

interface ReplicaSelector extends Configurable, Closeable {
    /**
     * NoteSelect thatthe thispreferred isreplica only a recommendation.client Ifshould theuse brokerfor doesfetching.
     * notIf no supportreplica followeris fetchingavailable, thenthis themethod consumershould willreturn fetch
an     * from the leaderempty optional.
	 */
	NodeOptional<ReplicaView> select(String rackIdTopicPartition topicPartition, ClientMetadata metadata, PartitionInfoPartitionView partitionInfopartitionView);

    /**
     * Optional custom configuration
     */
    default void configure(Map<String, ?> configs) {}

    /**
     * Optional shutdown hook
     */
	default void close() {}
}


By default, we We will provide a simple implementation which uses the rackId provided by the clients and implements exact matching on the rackId from replicastwo implementations out of the box. One of these is the LeaderSelector mentioned above.

Code Block
class RackAwareReplicaSelectorLeaderSelector implements ReplicaSelector {

  NodeOptional<ReplicaView> select(String rackIdTopicPartition topicPartition, ClientMetadata metadata, PartitionInfoPartitionView partitionInfopartitionView) {
    return partitionView.leader();
  }
}


The second uses the rackId provided by the clients and implements exact matching on the rackId from replicas.

Code Block
class RackAwareReplicaSelector implements ReplicaSelector {

  Optional<ReplicaView> select(TopicPartition topicPartition, ClientMetadata metadata, PartitionView partitionView) {
    // if rackId is not null, iterate through the online replicas 
    // if one or more exists with matching rackId, choose a the most caught-up replica randomly from among them
    // otherwise return the current leader
  }
}

Protocol Changes

We will extend the Metadata Fetch API in order to enable selection of the preferred replica. We use of the "client.rack.id" configuration in the consumer that was mentioned above to identify the location of the broker, so we will add a similar configuration to the consumer. 

Code Block
MetadataRequest => [TopicName] AllowAutoTopicCreation RackId{
  "validVersions": "0-11",
  "fields": [
  TopicName => STRING{
  AllowAutoTopicCreation => BOOLEAN    "name": "ReplicaId",
  RackId   => STRING  // This is new

The metadata response will indicate the preferred replica. 

Code Block
MetadataResponse => ThrottleTimeMs Brokers ClusterId ControllerId [TopicMetadata]
  ThrottleTimeMs => INT32
  Brokers => [MetadataBroker]
  ClusterId => NULLABLE_STRING
  ControllerId => INT32
   
TopicMetadata => ErrorCode TopicName IsInternal [PartitionMetadata]
  ErrorCode => INT16
  TopicName => STRING
  IsInternal => BOOLEAN
   
PartitionMetadata => ErrorCode PartitionId Leader LeaderEpoch Replicas ISR OfflineReplicas
  ErrorCode => INT16
  PartitionId => INT32
  Leader => INT32
  LeaderEpoch => INT32
  PreferredReplica => INT32 // This is new
  Replicas => [INT32]
  ISR => [INT32]
  OfflineReplicas => [INT32]

...

 "type": "int32",
      "versions": "0+",
      "about": "The broker ID of the follower, of -1 if this request is from a consumer."
    },
    {
      "name": "MaxWait",
      "type": "int32",
      "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response."
    },
    {
      "name": "MinBytes",
      "type": "int32",
      "versions": "0+",
      "about": "The minimum bytes to accumulate in the response."
    },
    {
      "name": "MaxBytes",
      "type": "int32",
      "versions": "3+",
      "default": "0x7fffffff",
      "ignorable": true,
      "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored."
    },
    {
      "name": "IsolationLevel",
      "type": "int8",
      "versions": "4+",
      "default": "0",
      "ignorable": false,
      "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records"
    },
    {
      "name": "SessionId",
      "type": "int32",
      "versions": "7+",
      "default": "0",
      "ignorable": false,
      "about": "The fetch session ID."
    },
    {
      "name": "Epoch",
      "type": "int32",
      "versions": "7+",
      "default": "-1",
      "ignorable": false,
      "about": "The fetch session ID."
    },
    {
      "name": "Topics",
      "type": "[]FetchableTopic",
      "versions": "0+",
      "about": "The topics to fetch.",
      "fields": [
        {
          "name": "Name",
          "type": "string",
          "versions": "0+",
          "entityType": "topicName",
          "about": "The name of the topic to fetch."
        },
        {
          "name": "FetchPartitions",
          "type": "[]FetchPartition",
          "versions": "0+",
          "about": "The partitions to fetch.",
          "fields": [
            {
              "name": "PartitionIndex",
              "type": "int32",
              "versions": "0+",
              "about": "The partition index."
            },
            {
              "name": "CurrentLeaderEpoch",
              "type": "int32",
              "versions": "9+",
              "default": "-1",
              "ignorable": true,
              "about": "The current leader epoch of the partition."
            },
            {
              "name": "FetchOffset",
              "type": "int64",
              "versions": "0+",
              "about": "The message offset."
            },
            {
              "name": "LogStartOffset",
              "type": "int64",
              "versions": "5+",
              "default": "-1",
              "ignorable": false,
              "about": "The earliest available offset of the follower replica. The field is only used when the request is sent by the follower."
            },
            {
              "name": "MaxBytes",
              "type": "int32",
              "versions": "0+",
              "about": "The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored."
            }
          ]
        }
      ]
    },
    {
      "name": "Forgotten",
      "type": "[]ForgottenTopic",
      "versions": "7+",
      "ignorable": false,
      "about": "In an incremental fetch request, the partitions to remove.",
      "fields": [
        {
          "name": "Name",
          "type": "string",
          "versions": "7+",
          "entityType": "topicName",
          "about": "The partition name."
        },
        {
          "name": "ForgottenPartitionIndexes",
          "type": "[]int32",
          "versions": "7+",
          "about": "The partitions indexes to forget."
        }
      ]
    },
    {
      "name": "RackId",
      "type": "string",
      "versions": "11+",
      "default": "",
      "ignorable": true,
      "about": "Rack ID of the consumer making this request"
    }
  ]
}


The fetch response will indicate the preferred replica. 

Code Block
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-11",
  "fields": [
    {
      "name": "ThrottleTimeMs",
      "type": "int32",
      "versions": "1+",
      "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."
    },
    {
      "name": "ErrorCode",
      "type": "int16",
      "versions": "7+",
      "ignorable": false,
      "about": "The top level response error code."
    },
    {
      "name": "SessionId",
      "type": "int32",
      "versions": "7+",
      "default": "0",
      "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session."
    },
    {
      "name": "Topics",
      "type": "[]FetchableTopicResponse",
      "versions": "0+",
      "about": "The response topics.",
      "fields": [
        {
          "name": "Name",
          "type": "string",
          "versions": "0+",
          "entityType": "topicName",
          "about": "The topic name."
        },
        {
          "name": "Partitions",
          "type": "[]FetchablePartitionResponse",
          "versions": "0+",
          "about": "The topic partitions.",
          "fields": [
            {
              "name": "PartitionIndex",
              "type": "int32",
              "versions": "0+",
              "about": "The partiiton index."
            },
            {
              "name": "ErrorCode",
              "type": "int16",
              "versions": "0+",
              "about": "The error code, or 0 if there was no fetch error."
            },
            {
              "name": "HighWatermark",
              "type": "int64",
              "versions": "0+",
              "about": "The current high water mark."
            },
            {
              "name": "LastStableOffset",
              "type": "int64",
              "versions": "4+",
              "default": "-1",
              "ignorable": true,
              "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"
            },
            {
              "name": "LogStartOffset",
              "type": "int64",
              "versions": "5+",
              "default": "-1",
              "ignorable": true,
              "about": "The current log start offset."
            },
            {
              "name": "Aborted",
              "type": "[]AbortedTransaction",
              "versions": "4+",
              "nullableVersions": "4+",
              "ignorable": false,
              "about": "The aborted transactions.",
              "fields": [
                {
                  "name": "ProducerId",
                  "type": "int64",
                  "versions": "4+",
                  "entityType": "producerId",
                  "about": "The producer id associated with the aborted transaction."
                },
                {
                  "name": "FirstOffset",
                  "type": "int64",
                  "versions": "4+",
                  "about": "The first offset in the aborted transaction."
                }
              ]
            },
            {
              "name": "PreferredReadReplica",
              "type": "int32",
              "versions": "11+",
              "ignorable": true,
              "about": "The preferred read replica for the consumer to use on its next fetch request"
            },
            {
              "name": "Records",
              "type": "bytes",
              "versions": "0+",
              "nullableVersions": "0+",
              "about": "The record data."
            }
          ]
        }
      ]
    }
  ]
}


The FetchRequest schema has field for the replica id. Consumers typically use the sentinel -1, which indicates that fetching is only allowed from the leader. A lesser known sentinel is -2, which was originally intended to be used for debugging and allows fetching from followers. We propose to let the consumer use this to indicate the intent to allow fetching from a follower. Similarly, when we need to send a ListOffsets request to a follower in order to find the log start offset, we will use the same sentinel for the replica id field.

It is still necessary, however, to bump the version of the Fetch request because of the use of the OFFSET_NOT_AVAILABLE error code mentioned above. The request and response schemas will not change. We will also modify the behavior of all fetch versions so that the current log start offset and high watermark are returned if the fetch offset is out of range. This allows the broker to distinguish out of range errors.

One point that was missed in KIP-320 is the fact that the OffsetsForLeaderEpoch API exposes the log end offset. For consumers, the log end offset should be the high watermark, so we need a way for this protocol to distinguish replica and consumer requests. We propose here to add the same `replica_id` field that is used in the Fetch and ListOffsets APIs. The new schema is provided below:

Code Block
OffsetsForLeaderEpochRequest => [Topic]
  ReplicaId => INT32              // New (-1 means consumer)
  Topic => TopicName [Partition]
    TopicName => STRING
    Partition => PartitionId CurrentLeaderEpoch LeaderEpoch
      PartitionId => INT32
      CurrentLeaderEpoch => INT32
      LeaderEpoch => INT32

When an OffsetsForLeaderEpoch request is received from a consumer, the returned offset will be limited to the high watermark. As with the Fetch API, when a leader has just been elected, the true high watermark is not known for a short time. If an OffsetsForLeaderEpoch request is received with the latest epoch during this window, the leader will respond with the OFFSET_NOT_AVAILABLE error code. This will cause the consumer to retry. Note as well that only leaders are allowed to handle OffsetsForLeaderEpoch queries. 

A new JMX metric will be added to the Java client when it has a preferred read replica. Under the "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=*,partition=*" object, a new attribute "preferred-read-replica" will be added. This metric will have a value equal to the broker ID which the consumer is currently fetching from. If the attribute is missing or set to -1, it means the consumer is fetching from the leader.

Compatibility, Deprecation, and Migration Plan

This change is backwards compatible with previous versions. If the broker does not support follower fetching, the consumer will revert to the old behavior of fetching from the leader.

Rejected Alternatives

  • Consumer Replica Selection: The first iteration of this KIP proposed to let the consumer decide which replica to fetch from using an interface similar to the one we have above. Ultimately we felt it was easier to reason about the system in a multi-tenant environment when replica selection logic could be controlled by the broker. It is much more difficult to have common replica selection logic through a client plugin since it involves coordinating dependencies and configuration across many applications. For the same reason, we also rejected the option to specify whether the preferred replica should be used by the client. 
  • Handling Old Fetch Versions:

It is still necessary, however, to bump the version of the Fetch request because of the use of the OFFSET_NOT_AVAILABLE error code mentioned above. The request and response schemas will not change. We will also modify the behavior of all fetch versions so that the current log start offset and high watermark are returned if the fetch offset is out of range. This allows the broker to distinguish out of range errors.

One point that was missed in KIP-320 is the fact that the OffsetsForLeaderEpoch API exposes the log end offset. For consumers, the log end offset should be the high watermark, so we need a way for this protocol to distinguish replica and consumer requests. We propose here to add the same `replica_id` field that is used in the Fetch and ListOffsets APIs. The new schema is provided below:

Code Block
OffsetsForLeaderEpochRequest => [Topic]
  ReplicaId => INT32              // New (-1 means consumer)
  Topic => TopicName [Partition]
    TopicName => STRING
    Partition => PartitionId CurrentLeaderEpoch LeaderEpoch
      PartitionId => INT32
      CurrentLeaderEpoch => INT32
      LeaderEpoch => INT32

When an OffsetsForLeaderEpoch request is received from a consumer, the returned offset will be limited to the high watermark. As with the Fetch API, when a leader has just been elected, the true high watermark is not known for a short time. If an OffsetsForLeaderEpoch request is received with the latest epoch during this window, the leader will respond with the OFFSET_NOT_AVAILABLE error code. This will cause the consumer to retry. Note as well that only leaders are allowed to handle OffsetsForLeaderEpoch queries. 

Compatibility, Deprecation, and Migration Plan

This change is backwards compatible with previous versions. If the broker does not support follower fetching, the consumer will revert to the old behavior of fetching from the leader.

Rejected Alternatives

  • We have considered allowing the consumer to use older versions of the Fetch API. The only complication is that older versions will not allow us to use the more specific out of range error codes. This complicates out of range handling because we do not know whether we should expect the out of range offset to exist on the leader or not. If the offset is too small on the follower, it may still be in range on the leader. One option is to always do offset reconciliation on the replica that the consumer is fetching from. The downside of this is that we cannot detect situations when the consumer has seen a later offset than a replica due to stale propagation of the ISR state. 
  • Improved Log Start Offset Handling: We have mentioned the difficulty of reasoning about the consistency of the log start offset due to the fact that the retention is enforced independently by all replicas. One option we have considered is to only allow the leader to enforce retention and to rely only on the propagation of the log start offset in the Fetch response for follower deletion. In principle, this would give us the guarantee that the log start offset of a follower should always be less than or equal to that of the leader. However, unless we make log start offset acknowledgement a criteria for ISR membership, we cannot actually guarantee this. In the case of multiple leader elections occurring in a short time period, it is possible for this guarantee to be violated. Nevertheless, we think it may still be a good idea to rely on log start offset propagation for follower deletion, but we will consider this separately.

...