Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Draft

Discussion thread: n/a

...

POC: https://github.com/apache/kafka/pull/9616



Motivation

When Streams is processing a task with multiple inputs, each time it is ready to process a record, it has to choose which input to process next. It always takes from the input for which the next record has the least timestamp. The result of this is that Streams processes data in timestamp order. However, if the buffer for one of the inputs is empty, Streams doesn't know what timestamp the next record for that input will be.

...

UC3: I would like to try and select among the inputs in order, but if some of the partitions are empty on the brokers , I would like to wait some configured amount of time.

Proposed Solution

Streams Config change

We propose to alter the max.task.idle.ms config to guarantee to wait for fetches from the empty partitions before actualizing the timeout. This preserves the meaning of the config (how long to wait for more data to arrive for empty partitions before proceeding), but changes its behavior. Rather than starting to count against the timeout as soon as we discover an empty local buffer, we will only count against the timeout when the lag is zero. I.e., if the lag is not known, then we have to wait for a fetch, and if the lag is non-zero, then we wait to poll the data. Thus, the configuration controls how long to wait for new data to be produced on the topic, regardless of fetch dynamics between the consumer and the brokers.

...

  • UC1 can be satisfied with the configuration value of "max.task.idle.ms: -1", which indicates that Streams will never wait to buffer extra data, but always choose from what it already has buffered locally.

Consumer change

While analyzing possible implementations, we have determined that the current Consumer API does not offer a deterministic way to know whether a specific partition has been fetched. Right now, callers can only observe when a poll returns some data for a partition. If poll does not return data from a partition, it might have fetched and received no records, or it might not have fetched (yet). Therefore, we also prose to alter Consumer#poll to supply this information.

...

Streams will be able to use this new method by maintaining internal flags of which partitions have been fetched, what the lag was at each fetch, and when the fetches took place. Essentially, each time we get a response back from poll(), we will persist the receivedTimestamp and lag for each partition. Then, when it comes time to decide whether we are ready to process, if we don't have data buffered for a partition, then we can consult this local metadata to see what the lag is. If the lag is missing or too stale, we would wait to fetch the metadata. If the lag is greater than zero, then we know there is data on the brokers, and we should wait to poll it before processing anything. If the lag is zero, then we can apply the config to idle the desired amount of time before enforcing processing.

Public Interfaces Affected

  • Change "max.task.idle.ms" to accept "-1" as a flag to disable task idling entirely
  • Change the semantics of the default of "max.task.idle.ms" ("0") so that Streams will deterministically fetch all available partitions before deciding to proceed with enforced processing.
  • Add ConsumerRecords#metadata(): Map<TopicPartition, Metadata>, where Metadata includes receivedTime, position, lag, startOffset, and endOffset information

Proposed Changes

We propose to alter the config value space of "max.task.idle.ms" in the following way:

...

Code Block
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {

    ...

    public static final class Metadata {
        public long receivedTimestamp()
        public long position()
        public long lag()
        public long startOffset()
        public long endOffset()
    }
    
    ...

    public Map<TopicPartition, Metadata> metadata()

    ...

}


Compatibility, Deprecation, and Migration Plan

Streams Config Change

After this change, Streams will offer more intuitive join semantics by default at the expense of occasionally having to pause processing if we haven't gotten updated lag metadata from the brokers in a while. This is typically not expected, since the brokers send back as much metadata as they have available in each fetch response, so we do not anticipate a default performance degradation under normal circumstances.

ConsumerRecords#metadata addition

All of the methods in ConsumerRecords will continue to behave in exactly the same way, so no existing use cases are affected. The changes proposed herein are pure additions. Also, the proposed interface can easily be expanded to include other metadata if desired in the future.

Rejected Alternatives

Streams Config Change

We could instead propose a new config with the desired semantics and deprecate the existing one. We judged it to be simpler for both users and maintainers to keep the config the same, since the semantics are so close to the existing ones. We are taking the perspective that the current "semantics" are really just a bugged implementation of the semantics we desired to provide in the first place.

Exposing fetched partitions in ConsumerRecords

We considered simply exposing which partitions have been fetched within the poll in ConsumerRecords. Either by adding empty-fetched partitions in `partitions()`, or by adding a new method to expose the partitions that had been fetched. The theory was that when the consumer gets an empty fetch response back from the brokers, it means that the partition in question has no more records to fetch. However, it turns out that the broker may also return empty responses for other reasons, such as:

...