Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add more detail about the out of range cases

...

We propose to extend the fetch protocol to allow consumer fetching from any replica. Just as when fetching from replicas in the ISR. The reason to restrict to the ISR is to ensure that consumer sees the state of the log as accurately as possible so that there is no unnecessary latency getting the latest data.the leader, the replication protocol will guarantee that only committed data is returned to the consumer. This relies on propagation of the high watermark through the leader's fetch response. Due to this propagation delay, follower fetching may have higher latency. Additionally, follower fetching introduces the possibility of spurious out of range errors if a fetch is received by an out-of-sync replica. The diagram below will help to make this clearer.

Image Added


In this diagram, the green portion of the log represents the records that are both available locally on the replica and known to be committed. The yellow portion represents the records which are either not available locally or not known to be committed (i.e. it has been successfully replicated to all members of the ISR). The leader always sees high watermark updates before any of the replicas. An in-sync replica may therefore have some committed data which is not yet available for consumption. In the case of an out-of-sync replica, the high watermark is typically above its log end offset, so there may be some committed data which is not yet locally available. 

Note that the log start offsets are not consistently maintained between the leader and followers (even those in the ISR). This is caused by the fact that retention is enforced independently by each replica. We will discuss this point in more detail below.In general, a follower sees a stale view of partition metadata including the current high watermark, the log start offset, and the current ISR. The implication of having a lagging high watermark is that fetching from followers may have slightly higher latency for consumers. The implication of a lagging log start offset and ISR state, is that our offset may become out of range when changing the replicas that a consumer is fetching from. In fact, there are currently no strong guarantees on the consistency of the log start offset between replicas (even those in the ISR). We deal with these points in more detail below:

High watermark propagation

Some extra latency is expected when fetching from followers, but we should ensure that there are no unnecessary delays in the propagation of the high watermark. Following the improvements in in KIP-227, we the leader can use the fetch session to track the high watermark of the follower. Currently, updating the follower's high watermark could be delayed by as long as replica.fetch.max.wait.ms if there is no new data written to the partition.

We propose to change the fetch satisfaction logic to take high watermark updates into account. The leader will respond to a fetch immediately if the follower has a stale high watermark. In particular, the first request in a fetch session will not block on the leader in order to ensure that the follower has the latest high watermark for requested partitions. A side benefit of this change is that it reduces the window following a leader election in which the high watermark can be observed to go backwards (see see KIP-207).

Out of range handling

Due to the ISR change propagation delay mentioned above and the weak consistency of the log start offset, it is possible that an offset which is valid on one broker may be out of range on another. The client needs a way to reason about these errors so that it can reliably find the next offset to fetch from without weakening its delivery semantics.

The first improvement we make is to allow the broker to distinguish whether an out of range offset was too low or too high. Rather than the single OFFSET_OUT_OF_RANGE error, we will introduce two new error codes: FETCH_OFFSET_TOO_LARGE and FETCH_OFFSET_TOO_SMALL. Generally, we have stronger guarantees when it comes to reasoning about the consistency of the log end offset, so it simplifies client handling if we know this has happened.

The main challenge for follower fetching is that a consumer may observe a committed offset before it is available on a follower. We need to define the expected behavior if the consumer attempts to fetch a valid offset from a follower which is relying on stale log metadata.

There are four cases to consider (each identified in the diagram above):

Case 1 (uncommitted offset): The offset is available locally, but is not known to be committed. In fact, the broker already has logic to handle this case. When a replica is elected leader, it does not know what the true value of the high watermark should be until it receives fetches from the current ISR. If a consumer fetches at an offset between the last updated high watermark value and the log end offset, then the broker currently returns an empty record set. This ensures that no records are returned until they are known to be committed.

A similar problem was addressed in KIP-207. Prior to this, a leader which was just elected might return a stale value of the high watermark. With the adoption of KIP-207, the broker instead returns the OFFSET_NOT_AVAILABLE error code until the true high watermark can be determined. This is slightly inconsistent with the fetch behavior. The unintended side effect of returning an empty record set (and no error) is that it might expose an offset before it is known to be committed.

One of the improvements we make in this KIP is to make the behavior of these two APIs consistent. When the broker receives a fetch for an offset between the local high watermark and the log end offset, whether it's on the leader or follower, we will return the OFFSET_NOT_AVAILABLE error code. The consumer will handle this by retrying.


Case 2 (unavailable offset): The offset is not available locally, but is known to be committed. This can occur if a fetch is received by an out-of-sync replica. Even out-of-sync replicas will continue fetching from the leader and they may have received a high watermark value which is larger than its log end offset. So a fetch at an offset between the log end offset and the high watermark received from the leader is known to be committed though the replica does not have the data. 

In general, consumers should fetch only from the ISR, but it is not possible to prevent fetching from outside the ISR because it takes time for the ISR state to propagate to each replica. We propose here to also return OFFSET_NOT_AVAILABLE for this case, which will cause the consumer to retry. In the common case, we expect the out-of-sync replica to return to the ISR after a short time. On the other hand, if the replica remains out-of-sync for a longer duration, the consumer will have a chance to find a new replica after its metadata refresh. We discuss this point in more detail below in the section on how the consumer finds its preferred replica.


Case 3 (offset too small): A valid offset on one replica is smaller than the log start offset on another replica. To detect this case, we propose a minor change to the fetch response. When returning an OUT_OF_RANGE error, the broker will provide its current log start offset and high watermark. The consumer can compare these values with the fetch offset to determine whether the offset was too small or too large.

Reasoning about this case on the client is difficult due to the absence of guarantees on the consistency of the log start offsetFETCH_OFFSET_TOO_SMALL: Due to the weak consistency of the log start offset, it is possible to receive the FETCH_OFFSET_TOO_SMALL error code when switching fetching between replicas. Reasoning about this inconsistency is complex without stronger guarantees. However, we expect the log start offset of replicas to be relatively close, so the benefit of any additional handling seems minimalmarginal. Here we propose to skip offset reconciliation as outlined in KIP-320 when handling this error and simply resort to the offset reset policy.

Note that when the offset reset policy is set to "earliest," we need to find the log start offset of the current replica we are fetching from. The log start offset for the leader is not guaranteed to be present on the follower we are fetching from and we do not want the consumer to get stuck in a reset loop while we wait for convergence.  To summarize, when So to simplify handling, the consumer will use the earliest offset on whatever replica it is fetching from.

In summary, to handle this case receives the FETCH_OFFSET_TOO_SMALL error code, the consumer will take the following steps:

  1. If the reset policy is "earliest," fetch the log start offset of the current replica that raised the out of range error:.
  2. If the reset policy is "latest," fetch the log end offset from the leader.
  3. If the reset policy is "none," raise an exception.


Case 4 (offset too large): The offset is not available locally on the replica and is not known to be committed. This can happen as in case 2 if an out-of-sync replica receives a fetch at an offset which is larger than the last high watermark value received from the leader. The replica has no choice in this case but to return OFFSET_OUT_OF_RANGE. In general, when fetching from followers, out of range errors cannot be taken as definitive. Before raising an error, the consumer must validate whether the offset is correctFETCH_OFFSET_TOO_LARGE: The FETCH_OFFSET_TOO_LARGE error code is more interesting. Due to metadata propagation delays, this error may be spurious. When a leader shrinks the ISR, the controller is notified through Zookeeper. The controller then sends LeaderAndIsr requests to the replicas of affected partitions. During this period, the follower will not know that it is out of sync and would still respond to fetch requests. This is similar to the problem of a stale leader responding to fetch requests before it realizes that the controller has elected another leader.

In fact, the logic to handle this case is already outlined in KIP-320. The consumer will reconcile its current offset with the leader and then will resume fetching. The only change we make here is to request a metadata update if we find that the current offset is still valid on the leader. This gives us a chance to detect any ISR changes before we resume fetching from the same replica.

It may not be immediately obvious that the OffsetsForLeaderEpoch can be used to determine if an offset is still valid. The consumer will request from the leader the end offset for the epoch corresponding to its current fetch offset. If the fetch offset has been truncated in an unclean leader election, then the OffsetsForLeaderEpoch response will return either 1) the same epoch with an end offset smaller than the fetch offset, or 2) a smaller epoch smaller epoch and a smaller fetch offset. In either case, the returned end offset would be smaller than the fetch offset. Otherwise, if the offset exists, then the response will include the same epoch and an end offset which is larger than the fetch offset.    

So to handle the FETCHOFFSET_OFFSETOUT_TOOOF_LARGE RANGE error code, the consumer will take the following steps:

  1. Use the OffsetForLeaderEpoch API to verify the current position with the leader.
    1. If the fetch offset is still valid, refresh metadata and continue fetching
    2. If truncation was detected, follow the steps in KIP-320 to either reset the offset or raise the truncation error
    3. Otherwise, follow the same steps above in the FETCH_OFFSET_TOO_SMALL handling depending on the reset policyas in case 3.

If we do not have an epoch corresponding to the current fetch offset, then we will skip the truncation check and use the same handling that we used for the FETCH_OFFSET_TOO_SMALL case (basically we resort to the reset policy). In practice, out of range errors due to ISR propagation delays should be extremely rare because it requires a consumer to see a committed offset before a follower that is still considered in the ISR.

...

TLDR: The only changes here are the following:

  1. The broker will return OFFSET_NOT_AVAILABLE if an offset is known to exist, but is either not available or not known to be committed.
  2. For out of range errors, we
  3. We are adding error codes to distinguish out of range cases (FETCH_OFFSET_TOO_SMALL and FETCH_OFFSET_TOO_LARGE).
  4. We will skip the offset truncation check from KIP-320 when we see FETCH_OFFSET_TOO_SMALLif the fetch offset is smaller than the log start offset.
  5. When resetting to the earliest offset, we need to query the replica that we will be fetching from.

...

It is still necessary, however, to bump the version of the Fetch request because of the use of the new error codesOFFSET_NOT_AVAILABLE error code mentioned above. The request and response schemas will be unchanged, but instead of returning OFFSET_OUT_OF_RANGE following an out of range error, the broker will return the more specific FETCH_OFFSET_TOO_LARGE and FETCH_OFFSET_TOO_SMALL error codes.not change. We will also modify the behavior of all fetch versions so that the current log start offset and high watermark are returned if the fetch offset is out of range. This allows the broker 

One point that was missed in KIP-320 is the fact that the OffsetsForLeaderEpoch API exposes the log end offset. For consumers, the log end offset should be the high watermark, so we need a way for this protocol to distinguish replica and consumer requests. We propose here to add the same `replica_id` field that is used in the Fetch and ListOffsets APIs. The new schema is provided below:

Code Block
OffsetsForLeaderEpochRequest => [Topic]
  ReplicaId => INT32              // New (-1 means consumer)
  Topic => TopicName [Partition]
    TopicName => STRING
    Partition => PartitionId CurrentLeaderEpoch LeaderEpoch
      PartitionId => INT32
      CurrentLeaderEpoch => INT32
      LeaderEpoch => INT32


Consumer Changes

We will expose a new plugin interface that allows the user to provide custom logic to determine the preferred replica to fetch from.

Code Block
interface ReplicaSelector extends Configurable, Closeable {
    /**
     * Select the replica to use for fetching the given partition.
     * This method is invoked whenever a change to the ISR of that
     * partition is observed.
     *
     * Note that this is only a recommendation. If the broker does
     * not support follower fetching, then the consumer will fetch
     * from the leader.
	 */
	Node select(TopicPartition partition, Node leader, List<Node> isrPartitionInfo partitionInfo);

    /**
     * Optional custom configuration.
     */
    default void configure(Map<String, ?> configs) {}


    /**
     * Optional shutdown hook
     */
	default void close() {}
}


This will be controlled through the replica.selector.class consumer configuration. By default, the consumer will retain the current behavior of fetching only from the leader. We will have a simple LeaderSelector implementation to enable this.

...