
Current state: Draft

Discussion thread: n/a




When Streams is processing a task with multiple inputs, each time it is ready to process a record, it has to choose which input to process next. It always takes from the input for which the next record has the least timestamp. The result of this is that Streams processes data in timestamp order. However, if the buffer for one of the inputs is empty, Streams doesn't know what timestamp the next record for that input will be.

Streams introduced a configuration "" in KIP-353 to address this issue ( ).

The config allows Streams to wait some amount of time for data to arrive on the empty input, so that it can make a timestamp-ordered decision about which input to pull from next.

However, this config can be hard to use reliably and efficiently, since what we're really waiting for is the next poll that would return data from the empty input's partition, and this guarantee is a function of the poll interval, the max poll interval, and the internal logic that governs when Streams will poll again.

In the situation of a task with two (or more) inputs, there are three use cases:

UC1: I do not care about selecting among the inputs in order. I never want to wait to fetch data from empty inputs as long as I have at least one non-empty input. I just want to process records from any input as soon as they are available.

UC2: I would like to try and select among the inputs in order, but if some of the partitions are empty on the brokers , I don't want to wait for the producers to send more data.

UC3: I would like to try and select among the inputs in order, but if some of the partitions are empty on the brokers , I would like to wait some configured amount of time.

Proposed Solution

Streams Config change

We propose to alter the config to guarantee to wait for fetches from the empty partitions before actualizing the timeout. This preserves the meaning of the parameter (how long to wait for more data to arrive for empty partitions before proceeding), but changes its behavior. Rather than starting to count against the timeout as soon as we discover an empty local buffer, we will only count against the timeout when the lag is zero. I.e., if the lag is not known, then we have to wait for a fetch, and if the lag is non-zero, then we wait to poll the data. Thus, the configuration controls how long to wait for new data to be produced on the topic, regardless of fetch dynamics between the consumer and the brokers.

With this proposal:

Note that UC1 is not satisfiable with this alone, so we also propose to add an extra config value of "-1", which indicates that the task idling feature itself should be disabled. So:

Consumer change

Finally, while analyzing possible implementations, we have determined that the current Consumer API does not offer a deterministic way to know whether a specific partition has been fetched. Right now, callers can only observe when a poll returns some data for a partition. If poll does not return data from a partition, it might have fetched and received no records, or it might not have fetched (yet). Therefore, we also prose to alter Consumer#poll to supply this information.

In the current API, Consumer#poll returns a ConsumerRecords instance, which is an Iterable collection of ConsumerRecord instances, and which also offers a ConsumerRecords#partitions method to list the partitions that are represented in the results. This list of partitions does not currently include partitions that were fetched but contain no results.

Internally, when the consumer handles fetch responses from the brokers, it also receives metadata about the end offset of the partition(s), which it uses to update its metrics, including specifically a metric indicating the current lag. Theoretically, this could supply the information we need, except that we don't know how old those metrics are. I.e., for deterministic semantics, we need to wait for new fetches and  to know the lag information as of those fetches.

We propose to expose this fetched metadata in a new method on ConsumerRecords: ConsumerRecords#metadata(): Map<TopicPartition, Metadata>, where Metadata includes position, lag, and endOffset information.

Streams will be able to use this new method by maintaining internal flags of which partitions have been fetched, what the lag was at each fetch, and when the fetches took place.

Public Interfaces Affected

No Java interface changes, only:

Proposed Changes

We propose to alter the config value space of "" in the following way:

We also propose the following additions to the ConsumerRecords interface:

public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {


    public static final class Metadata {
        public long position()
        public long lag()
        public long endOffset()

    public Map<TopicPartition, Metadata> metadata()



Compatibility, Deprecation, and Migration Plan

Streams Config Change

The config change is mostly  backward compatible. The default behavior stays the same, because we change the default value from "0ms" to "-1 (disable idling)".

If users currently set a value for idling, they will observe upon upgrade that Streams starts to wait longer than before. This is not a strict violation of the old configuration, because Streams didn't previously promise to strictly adhere to the configured time, but rather just idle at least  that long. From discussions with users on the mailing list and elsewhere, including in , we believe that most users already would have assumed (or at least prefer) that we actually wait for fetches before enforcing the timeout. Thus, although the behavior will change, it will hopefully be a beneficial change.

The most concerning case for backward behavioral compatibility is if users have already set the config to "0ms" (overriding the default with the same value). If these users wished to indicate that they never want Streams to wait on empty partitions, then they may be dismayed to see Streams starting to wait. They will have to change the config to "-1 (disable idling)" or just unset the config, since the default behavior is not to wait anyway. There doesn't seem to be any obvious way to overcome this, except to deprecate the existing configuration and replace it with a new one. If this consideration seems significant to reviewers, this is exactly what I would propose to do instead.

ConsumerRecords#metadata addition

All of the methods in ConsumerRecords will continue to behave in exactly the same way, so no existing use cases are affected. The changes proposed herein are pure additions. Also,the proposed interface can easily be expanded to include other metadata if desired in the future.

Rejected Alternatives

Streams Config Change

We could instead propose a new config with the desired semantics and deprecate the existing one. We judged it to be simpler for both users and maintainers to keep the config the same, since the semantics are so close to the existing ones. We are taking the perspective that the current "semantics" are really just a bugged implementation of the semantics we desired to provide in the first place.

Exposing fetched partitions in ConsumerRecords

We considered simply exposing which partitions have been fetched within the poll in ConsumerRecords. Either by adding empty-fetched partitions in `partitions()`, or by adding a new method to expose the partitions that had been fetched. The theory was that when the consumer gets an empty fetch response back from the brokers, it means that the partition in question has no more records to fetch. However, it turns out that the broker may also return empty responses for other reasons, such as: