Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

It is common to have a Kafka cluster spanning multiple datacenters. For example, a common deployment is within an AWS region in which each availability zone is treated as a datacenter. Currently, Kafka has some basic support for rack awareness which can be used in this scenario to control replica placement (by treating the availability zone as a rack). However, currently consumers are limited to fetching only from the leader, so there is no easy way to leverage locality in order to reduce expensive cross-dc traffic. To address this gap, we propose here to allow consumers to fetch from the closest replica.

Proposed Changes

In order to support fetching from the closest replica, we need 1) to add support in the Fetch protocol for follower fetching, and 2) some way for the consumer to determine its distance to the replicas of a partition. 

Follower Fetching

We propose to extend the fetch protocol to allow fetching from replicas in the ISR. The reason to restrict to the ISR is to ensure that consumer sees the state of the log as accurately as possible so that there is no unnecessary latency getting the latest data.

In general, a follower sees a stale view of partition metadata including the current high watermark, the log start offset, and the current ISR. The implication of having a lagging high watermark is that fetching from followers may have slightly higher latency for consumers. The implication of a lagging log start offset and ISR state, is that our offset may become out of range when changing the replicas that a consumer is fetching from. In fact, there are currently no strong guarantees on the consistency of the log start offset between replicas (even those in the ISR). We deal with these points in more detail below:

High watermark propagation

Some extra latency is expected when fetching from followers, but we should ensure that there are no unnecessary delays in the propagation of the high watermark. Following the improvements in KIP-227, we can use the fetch session to track the high watermark of the follower. Currently, updating the follower's high watermark could be delayed by as long as replica.fetch.max.wait.ms if there is no new data written to the partition.

We propose to change the fetch satisfaction logic to take high watermark updates into account. The leader will respond to a fetch immediately if the follower has a stale high watermark. In particular, the first request in a fetch session will not block on the leader in order to ensure that the follower has the latest high watermark for requested partitions. A side benefit of this change is that it reduces the window following a leader election in which the high watermark can be observed to go backwards (see KIP-207).

Out of range handling

Due to the ISR change propagation delay mentioned above and the weak consistency of the log start offset, it is possible that an offset which is valid on one broker may be out of range on another. The client needs a way to reason about these errors so that it can reliably find the next offset to fetch from without weakening its delivery semantics.

...

  1. We are adding error codes to distinguish out of range cases (FETCH_OFFSET_TOO_SMALL and FETCH_OFFSET_TOO_LARGE).
  2. We will skip the offset truncation check from KIP-320 when we see FETCH_OFFSET_TOO_SMALL.
  3. When resetting to the earliest offset, we need to query the replica that we will be fetching from.

Finding the preferred follower

The consumer needs to have some logic to determine which replicas in the ISR should be preferred for fetching. It is difficult to make this logic general for all use cases because there could be many factors at play. For example, a user may wish to take into account the dollar cost of the link, the available bandwidth, the proximity to the consumer, load balancing, etc. We therefore propose to make this logic pluggable through the ReplicaSelector interface. This will be exposed to the consumer with the replica.selector.class configuration. If no selector is provided (the default), then the consumer will follow the current model of fetching only from the leader.

Public Interfaces

Protocol Changes

The FetchRequest schema has field for the replica id. Consumers typically use the sentinel -1, which indicates that fetching is only allowed from the leader. A lesser known sentinel is -2, which was originally intended to be used for debugging and allows fetching from followers. We propose to let the consumer use this to indicate the intent to allow fetching from a follower. Similarly, when we need to send a ListOffsets request to a follower in order to find the log start offset, we will use the same sentinel for the replica id field.

It is still necessary, however, to bump the version of the Fetch request because of the new error codes. The request and response schemas will be unchanged, but instead of returning OFFSET_OUT_OF_RANGE following an out of range error, the broker will return the more specific FETCH_OFFSET_TOO_LARGE and FETCH_OFFSET_TOO_SMALL error codes.

Consumer Changes

We will expose a new plugin interface that allows the user to provide custom logic to determine the preferred replica to fetch from.

...

Code Block
class LeaderSelector implements ReplicaSelector {
    Node select(TopicPartition partition, Node leader, List<Node> isr) {
        return leader;
    }
}


Compatibility, Deprecation, and Migration Plan

This change is backwards compatible with previous versions. If the broker does not support follower fetching, we will revert to the old behavior of fetching from the leader.

Rejected Alternatives

  • We have considered allowing the consumer to use older versions of the Fetch API. The only complication is that older versions will not allow us to use the more specific out of range error codes. This complicates out of range handling because we do not know whether we should expect the out of range offset to exist on the leader or not. If the offset is too small on the follower, it may still be in range on the leader. One option is to always do offset reconciliation on the replica that the consumer is fetching from. The downside of this is that we cannot detect situations when the consumer has seen a later offset than a replica due to stale propagation of the ISR state. 
  • We have mentioned the difficulty of reasoning about the consistency of the log start offset due to the fact that the retention is enforced independently by all replicas. One option we have considered is to only allow the leader to enforce retention and to rely only on the propagation of the log start offset in the Fetch response for follower deletion. In principle, this would give us the guarantee that the log start offset of a follower should always be less than or equal to that of the leader. However, unless we make log start offset acknowledgement a criteria for ISR membership, we cannot actually guarantee this. In the case of multiple leader elections occurring in a short time period, it is possible for this guarantee to be violated. Nevertheless, we think it may still be a good idea to rely on log start offset propagation for follower deletion, but we will consider this separately.

...