You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current stateUnder Discussion

Discussion thread:

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

It is common to have a Kafka cluster spanning multiple datacenters. For example, a common deployment is within an AWS region in which each availability zone is treated as a datacenter. Currently, Kafka has some basic support for rack awareness which can be used in this scenario to control replica placement (by treating the availability zone as a rack). However, currently consumers are limited to fetching only from the leader, so there is no easy way to leverage locality in order to reduce expensive cross-dc traffic. To address this gap, we propose here to allow consumers to fetch from the closest replica.

Proposed Changes

In order to support fetching from the closest replica, we need 1) to add support in the Fetch protocol for follower fetching, and 2) some way for the consumer to determine its distance to the replicas of a partition. 

Follower Fetching

We propose to extend the fetch protocol to allow fetching from replicas in the ISR. The reason to restrict to the ISR is to ensure that consumer sees the state of the log as accurately as possible so that there is no unnecessary latency getting the latest data.

In general, a follower sees a stale view of partition metadata including the current high watermark, the log start offset, and the current ISR. The implication of having a lagging high watermark is that fetching from followers may have slightly higher latency for consumers. The implication of a lagging log start offset and ISR state, is that our offset may become out of range when changing the replicas that a consumer is fetching from. In fact, there are currently no strong guarantees on the consistency of the log start offset between replicas (even those in the ISR). We deal with these points in more detail below:

High watermark propagation

Some extra latency is expected when fetching from followers, but we should ensure that there are no unnecessary delays in the propagation of the high watermark. Following the improvements in KIP-227, we can use the fetch session to track the high watermark of the follower. Currently, updating the follower's high watermark could be delayed by as long as replica.fetch.max.wait.ms if there is no new data written to the partition.

We propose to change the fetch satisfaction logic to take high watermark updates into account. The leader will respond to a fetch immediately if the follower has a stale high watermark. In particular, the first request in a fetch session will not block on the leader in order to ensure that the follower has the latest high watermark for requested partitions. A side benefit of this change is that it reduces the window following a leader election in which the high watermark can be observed to go backwards (see KIP-207).

Out of range handling

Due to the ISR change propagation delay mentioned above and the weak consistency of the log start offset, it is possible that an offset which is valid on one broker may be out of range on another. The client needs a way to reason about these errors so that it can reliably find the next offset to fetch from without weakening its delivery semantics.

The first improvement we make is to allow the broker to distinguish whether an out of range offset was too low or too high. Rather than the single OFFSET_OUT_OF_RANGE error, we will introduce two new error codes: FETCH_OFFSET_TOO_LARGE and FETCH_OFFSET_TOO_SMALL. Generally, we have stronger guarantees when it comes to reasoning about the consistency of the log end offset, so it simplifies client handling if we know this has happened.

FETCH_OFFSET_TOO_SMALL: Due to the weak consistency of the log start offset, it is possible to receive the FETCH_OFFSET_TOO_SMALL error code when switching fetching between replicas. Reasoning about this inconsistency is complex without stronger guarantees. However, we expect the log start offset of replicas to be relatively close, so the benefit of any additional handling seems minimal. Here we propose to skip offset reconciliation as outlined in KIP-320 when handling this error and simply resort to the offset reset policy.

Note that when the offset reset policy is set to "earliest," we need to find the log start offset of the current replica we are fetching from. The log start offset for the leader is not guaranteed to be present on the follower we are fetching from and we do not want the consumer to get stuck in a reset loop while we wait for convergence. 

To summarize, when the consumer receives the FETCH_OFFSET_TOO_SMALL error code, the consumer will take the following steps:

  1. If the reset policy is "earliest," fetch the log start offset of the current replica that raised the out of range error:
  2. If the reset policy is "latest," fetch the log end offset from the leader
  3. If the reset policy is "none," raise an exception

FETCH_OFFSET_TOO_LARGE: The FETCH_OFFSET_TOO_LARGE error code is more interesting. Due to metadata propagation delays, this error may be spurious. When a leader shrinks the ISR, the controller is notified through Zookeeper. The controller then sends LeaderAndIsr requests to the replicas of affected partitions. During this period, the follower will not know that it is out of sync and would still respond to fetch requests. This is similar to the problem of a stale leader responding to fetch requests before it realizes that the controller has elected another leader.

In fact, the logic to handle this case is already outlined in KIP-320. The consumer will reconcile its current offset with the leader and then will resume fetching. The only change we make here is to request a metadata update if we find that the current offset is still valid on the leader. This gives us a chance to detect any ISR changes before we resume fetching from the same replica.

It may not be immediately obvious that the OffsetsForLeaderEpoch can be used to determine if an offset is still valid. The consumer will request from the leader the end offset for the epoch corresponding to its current fetch offset. If the fetch offset has been truncated in an unclean leader election, then the OffsetsForLeaderEpoch response will return either 1) the same epoch with an end offset smaller than the fetch offset, or 2) a smaller epoch with a . Otherwise, if the offset exists, then the response will include the same epoch and an end offset which is larger than the fetch offset.    

So to handle the FETCH_OFFSET_TOO_LARGE error code, the consumer will take the following steps:

  1. Use the OffsetForLeaderEpoch API to verify the current position with the leader.
    1. If the fetch offset is still valid, refresh metadata and continue fetching
    2. If truncation was detected, follow the steps in KIP-320 to either reset the offset or raise the truncation error
    3. Otherwise, follow the same steps above in the FETCH_OFFSET_TOO_SMALL handling depending on the reset policy

If we do not have an epoch corresponding to the current fetch offset, then we will skip the truncation check and use the same handling that we used for the FETCH_OFFSET_TOO_SMALL case (basically we resort to the reset policy). In practice, out of range errors due to ISR propagation delays should be extremely rare because it requires a consumer to see a committed offset before a follower that is still considered in the ISR.

TLDR: The only changes here are the following:

  1. We are adding error codes to distinguish out of range cases (FETCH_OFFSET_TOO_SMALL and FETCH_OFFSET_TOO_LARGE).
  2. We will skip the offset truncation check from KIP-320 when we see FETCH_OFFSET_TOO_SMALL.
  3. When resetting to the earliest offset, we need to query the replica that we will be fetching from.

Finding the preferred follower

The consumer needs to have some logic to determine which replicas in the ISR should be preferred for fetching. It is difficult to make this logic general for all use cases because there could be many factors at play. For example, a user may wish to take into account the dollar cost of the link, the available bandwidth, the proximity to the consumer, load balancing, etc. We therefore propose to make this logic pluggable through the ReplicaSelector interface. This will be exposed to the consumer with the replica.selector.class configuration. If no selector is provided (the default), then the consumer will follow the current model of fetching only from the leader.

Public Interfaces

Protocol Changes

The FetchRequest schema has field for the replica id. Consumers typically use the sentinel -1, which indicates that fetching is only allowed from the leader. A lesser known sentinel is -2, which was originally intended to be used for debugging and allows fetching from followers. We propose to let the consumer use this to indicate the intent to allow fetching from a follower. Similarly, when we need to send a ListOffsets request to a follower in order to find the log start offset, we will use the same sentinel for the replica id field.

It is still necessary, however, to bump the version of the Fetch request because of the new error codes. The request and response schemas will be unchanged, but instead of returning OFFSET_OUT_OF_RANGE following an out of range error, the broker will return the more specific FETCH_OFFSET_TOO_LARGE and FETCH_OFFSET_TOO_SMALL error codes.

Consumer Changes

We will expose a new plugin interface that allows the user to provide custom logic to determine the preferred replica to fetch from.

interface ReplicaSelector extends Configurable {
    /**
     * Select the replica to use for fetching the given partition.
     * This method is invoked whenever a change to the ISR of that
     * partition is observed.
     *
     * Note that this is only a recommendation. If the broker does
     * not support follower fetching, then the consumer will fetch
     * from the leader.
	 */
	Node select(TopicPartition partition, Node leader, List<Node> isr);

    /**
     * Optional custom configuration.
     */
    default void configure(Map<String, ?> configs) {}
}


This will be controlled through the replica.selector.class consumer configuration. By default, the consumer will retain the current behavior of fetching only from the leader. We will have a simple LeaderSelector implementation to enable this.

class LeaderSelector implements ReplicaSelector {
    Node select(TopicPartition partition, Node leader, List<Node> isr) {
        return leader;
    }
}


Compatibility, Deprecation, and Migration Plan

This change is backwards compatible with previous versions. If the broker does not support follower fetching, we will revert to the old behavior of fetching from the leader.

Rejected Alternatives

  • We have considered allowing the consumer to use older versions of the Fetch API. The only complication is that older versions will not allow us to use the more specific out of range error codes. This complicates out of range handling because we do not know whether we should expect the out of range offset to exist on the leader or not. If the offset is too small on the follower, it may still be in range on the leader. One option is to always do offset reconciliation on the replica that the consumer is fetching from. The downside of this is that we cannot detect situations when the consumer has seen a later offset than a replica due to stale propagation of the ISR state. 
  • We have mentioned the difficulty of reasoning about the consistency of the log start offset due to the fact that the retention is enforced independently by all replicas. One option we have considered is to only allow the leader to enforce retention and to rely only on the propagation of the log start offset in the Fetch response for follower deletion. In principle, this would give us the guarantee that the log start offset of a follower should always be less than or equal to that of the leader. However, unless we make log start offset acknowledgement a criteria for ISR membership, we cannot actually guarantee this. In the case of multiple leader elections occurring in a short time period, it is possible for this guarantee to be violated. Nevertheless, we think it may still be a good idea to rely on log start offset propagation for follower deletion, but we will consider this separately.


  • No labels