Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose to alter the max.task.idle.ms config to guarantee to wait for fetches from the empty partitions before actualizing the timeout. This preserves the meaning of the parameter config (how long to wait for more data to arrive for empty partitions before proceeding), but changes its behavior. Rather than starting to count against the timeout as soon as we discover an empty local buffer, we will only count against the timeout when the lag is zero. I.e., if the lag is not known, then we have to wait for a fetch, and if the lag is non-zero, then we wait to poll the data. Thus, the configuration controls how long to wait for new data to be produced on the topic, regardless of fetch dynamics between the consumer and the brokers.

...

  • UC1 can be satisfied with the configuration value of "max.task.idle.ms: -1", which indicates that Streams will never wait to buffer extra data, but always choose from what it already has buffered locally.

Consumer change

Finally, while While analyzing possible implementations, we have determined that the current Consumer API does not offer a deterministic way to know whether a specific partition has been fetched. Right now, callers can only observe when a poll returns some data for a partition. If poll does not return data from a partition, it might have fetched and received no records, or it might not have fetched (yet). Therefore, we also prose to alter Consumer#poll to supply this information.

In the current API, Consumer#poll returns a ConsumerRecords instance, which is an Iterable collection of ConsumerRecord instances, and which also offers a ConsumerRecords#partitions method to list the partitions that are represented in the results. This list of partitions does not currently include partitions that were fetched but contain no results.

Internally, when the consumer handles fetch responses from the brokers, it also receives metadata about the end offset of the partition(s), which it uses to update its metrics, including specifically a metric indicating the current lag. Theoretically, this could supply the information we need, except that we don't know how old those metrics are. I.e., for deterministic To provide the desired semantics, we need to wait for new fetches and  to know the lag information as of those fetches.would like to use the lag information only if it is not too stale.

To overcome these challenges, we We propose to expose this fetched metadata in a new method on ConsumerRecords: ConsumerRecords#metadata(): Map<TopicPartition, Metadata>, where Metadata includes the receivedTime, position, lag, startOffset, and endOffset information.

Streams will be able to use this new method by maintaining internal flags of which partitions have been fetched, what the lag was at each fetch, and when the fetches took place. Essentially, each time we get a response back from poll(), we will persist the receivedTimestamp and lag for each partition. Then, when it comes time to decide whether we are ready to process, if we don't have data buffered for a partition, then we can consult this local metadata to see what the lag is. If the lag is missing or too stale, we would wait to fetch the metadata. If the lag is greater than zero, then we know there is data on the brokers, and we should wait to poll it before processing anything. If the lag is zero, then we can apply the config to idle the desired amount of time before enforcing processing.

Public Interfaces Affected

...

  • Change "max.task.idle.ms" to accept "-1" as a flag to disable task idling entirely
  • Change the semantics of the default of "max.task.idle.ms" to ("-10") so that task idling continues to be disabled by defaultStreams will deterministically fetch all available partitions before deciding to proceed with enforced processing.
  • Add ConsumerRecords#metadata(): Map<TopicPartition, Metadata>, where Metadata includes includes receivedTime, position, lag, startOffset, and endOffset information

Proposed Changes

...

  • Current:
    • negative integer: disallowed
    • 0 (default): indicates that Streams will never wait to buffer empty partitions before choosing the next record by timestamp
    • positive integer: indicates the amount of wall-clock time on the client to wait for more data to arrive on empty partitions (but no guarantee that the client will actually fetch again before the timeout expires).
  • Proposed:
    • negative integer other than -1: disallowed
    • -1 (default): indicates that Streams will never wait to buffer empty partitions before choosing the next record by timestamp
    • 0 (default): indicates that Streams will not wait for more data to be produced to empty partitions (partitions that are not buffered locally and also have zero lag). Doesn't count the time Streams has to wait to actually fetch those partitions' lags.
    • positive integer: Indicates the amount of time Streams will wait for more data to be produced to empty partitions. Doesn't count the time Streams has to wait to actually fetch those partitions' lags.


We also propose the following additions to the ConsumerRecords interface:

Code Block
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {

    ...

    public static final class Metadata {
        public long receivedTimestamp()
        public long position()
        public long lag()
        public long startOffset()
        public long endOffset()
    }
    
    ...

    public Map<TopicPartition, Metadata> metadata()

    ...

}

...

Compatibility, Deprecation, and Migration Plan

Streams Config Change

The config change is mostly  backward compatible. The default behavior stays the same, because we change the default value from "0ms" to "-1 (disable idling)".

If users currently set a value for idling, they will observe upon upgrade that Streams starts to wait longer than before. This is not a strict violation of the old configuration, because Streams didn't previously promise to strictly adhere to the configured time, but rather just idle at least  that long. From discussions with users on the mailing list and elsewhere, including in

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7458
, we believe that most users already would have assumed (or at least prefer) that we actually wait for fetches before enforcing the timeout. Thus, although the behavior will change, it will hopefully be a beneficial change.

The most concerning case for backward behavioral compatibility is if users have already set the config to "0ms" (overriding the default with the same value). If these users wished to indicate that they never want Streams to wait on empty partitions, then they may be dismayed to see Streams starting to wait. They will have to change the config to "-1 (disable idling)" or just unset the config, since the default behavior is not to wait anyway. There doesn't seem to be any obvious way to overcome this, except to deprecate the existing configuration and replace it with a new one. If this consideration seems significant to reviewers, this is exactly what I would propose to do insteadAfter this change, Streams will offer more intuitive join semantics by default at the expense of occasionally having to pause processing if we haven't gotten updated lag metadata from the brokers in a while. This is typically not expected, since the brokers send back as much metadata as they have available in each fetch response, so we do not anticipate a default performance degradation under normal circumstances.

ConsumerRecords#metadata addition

...