Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose to alter the max.task.idle.ms config to guarantee to wait for fetches from the empty partitions before actualizing the timeout. This preserves the meaning of the parameter (how long to wait for more data to arrive for empty partitions before proceeding), but changes its behavior. Rather than starting to count against the timeout as soon as we discover an empty local buffer, we will only start to count after we have an empty local buffer followed by an empty fetch result for that partition count against the timeout when the lag is zero. I.e., if the lag is not known, then we have to wait for a fetch, and if the lag is non-zero, then we wait to poll the data. Thus, the configuration controls how long to wait for new data to be produced on the topic, regardless of fetch dynamics between the consumer and the brokers.

With this proposal:

  • UC2 can be satisfied with a configuration value of "max.task.idle.ms: 0", since it means they will do at least one extra fetch for their empty partitions, and if those partitions are still empty, they proceed right away.
  • UC3 can be satisfied with any positive integer value for "max.task.idle.ms". They will wait the configured amount of time while also being sure they actually fetch from the brokers before enforcing the timeout.

...

In the current API, Consumer#poll returns a ConsumerRecords instance, which is an Iterable collection of ConsumerRecord instances, and which also offers a ConsumerRecords#partitions method to list the partitions that are represented in the results. This list of partitions does not currently include partitions that were fetched but contain no results. We propose to include these "empty fetched" partitions in the listing of ConsumerRecords#partitions.Further, there is also a ConsumerRecords#records(partition) that returns a list of records that were polled for that partition. If the caller supplies a partition for which there are no polled records, the method returns an empty list. Note that this continues to provide sound semantics when we start to include "empty fetched" partitions from the partitions() call. For further considerations of backward compatibility, see "Compatibility, Deprecation, and Migration Plan" below

Internally, when the consumer handles fetch responses from the brokers, it also receives metadata about the end offset of the partition(s), which it uses to update its metrics, including specifically a metric indicating the current lag. Theoretically, this could supply the information we need, except that we don't know how old those metrics are. I.e., for deterministic semantics, we need to wait for new fetches and  to know the lag information as of those fetches.

We propose to expose this fetched metadata in a new method on ConsumerRecords: ConsumerRecords#metadata(): Map<TopicPartition, Metadata>, where Metadata includes position, lag, and endOffset information.

Streams will be able to use this new behavior method by maintaining internal flags of which partitions have been fetched, even if they contained no records, by taking note of the partitions returned from ConsumerRecords#partitionswhat the lag was at each fetch, and when the fetches took place.

Public Interfaces Affected

...

  • Change "max.task.idle.ms" to accept "-1" as a flag to disable task idling entirely
  • Change default of "max.task.idle.ms" to "-1" so that task idling continues to be disabled by defaultChange "ConsumerRecords#partitions()" to list all partitions that were fetched, even if no records were returned
  • Add ConsumerRecords#metadata(): Map<TopicPartition, Metadata>, where Metadata includes position, lag, and endOffset information

Proposed Changes

We propose to alter the config value space of "max.task.idle.ms" in the following way:

  • Current:
    • negative integer: disallowed
    • 0 (default): indicates that Streams will never wait to buffer empty partitions before choosing the next record by timestamp
    • positive integer: indicates the amount of wall-clock time on the client to wait for more data to arrive on empty partitions (but no guarantee that the client will actually fetch again before the timeout expires).
  • Proposed:
    • negative integer other than -1: disallowed
    • -1 (default): indicates that Streams will never wait to buffer empty partitions before choosing the next record by timestamp


We also propose to include all partitions that were fetched since the last Consumer#poll in the returned ConsumerRecords#partitions(). This includes partitions that were fetched, but returned no records.the following additions to the ConsumerRecords interface:

Code Block
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {

    ...

    public static final class Metadata {
        public long position()
        public long lag()
        public long endOffset()
    }
    
    ...

    public Map<TopicPartition, Metadata> metadata()

    ...

}


Compatibility, Deprecation, and Migration Plan

...

The most concerning case for backward behavioral compatibility is if users have already set the config to "0ms" (overriding the default with the same value). If these users wished to indicate that they never want Streams to wait on empty partitions, then they may be dismayed to see Streams starting to wait. They will have to change the config to "-1 (disable idling)" or just unset the config, since the default behavior is not to wait anyway. There doesn't seem to be any obvious way to overcome this, except to deprecate the existing configuration and replace it with a new one. If this consideration seems significant to reviewers, this is exactly what I would propose to do instead.

...

ConsumerRecords#metadata addition

All of the methods in ConsumerRecords will continue to behave in exactly the same way, except partitions(), which will start to return extra partitions for which there are no records. It seems like the only way that this could meaningfully affect a program is if the caller of ConsumerRecords#partitions() uses the returned partitions to list records via ConsumerRecords#records(partition). If they have an expectation that the latter call will never return an empty list, then we would start to violate that expectation.

From a distance, this expectation would seem to be invalid, since an empty list is already in the range of results for ConsumerRecords#records(partition). Playing devil's advocate, the following program would be broken and might be considered legitimate:

Code Block
languagejava
linenumberstrue
...
for (partition : consumerRecords.partitions()) {
  records = consumerRecords.records(partition);
  highestFetchedOffsets.put(partition, records.get(records.size()-1).offset());
  ...
}

This code tracks the highest fetched offset for each partition by simply getting the offset of the last record returned for each partition. Since partitions() would only return partitions for which we fetched at least one record, this code would today always work, since records.size() would always be at least 1 in practice, and records.get(records.size()-1) would therefore always return the last record in the list.

However, after the proposed change, this code would begin throwing an IndexOutOfBoundsException when we fetched no records for a partition, since records.size() would evaluate to 0 in that case, leading to the invalid records.get(-1). Clearly, the workaround is to trivially check whether the list is empty before indexing into it. The key question is whether use cases like this are expected and supported, and whether it justified instead proposing a new method, like ConsumerRecords#fetchedPartitions() with the desired semantics.so no existing use cases are affected. The changes proposed herein are pure additions. Also,the proposed interface can easily be expanded to include other metadata if desired in the future.

Rejected Alternatives

Streams Config Change

We could instead propose a new config with the desired semantics and deprecate the existing one. We judged it to be simpler for both users and maintainers to keep the config the same, since the semantics are so close to the existing ones. We are taking the perspective that the current "semantics" are really just a bugged implementation of the semantics we desired to provide in the first place.

ConsumerRecords#partitions Change

Exposing fetched partitions in ConsumerRecords

We considered simply exposing which partitions have been fetched within the poll in ConsumerRecords. Either by adding empty-fetched partitions in `partitions()`, or by adding Instead of starting to return extra partitions in the ConsumerRecords#partitions() listing, we could propose to add a new method to include expose the partitions , like ConsumerRecords#fetchedPartitions(). The behavior change seems minor enough to justify just making the change in place, rather than introducing a new method to document and support from the maintainer side and learn about and use from the user side. Plus, the proposed behavior might be viewed as semantically compatible and otherwise superior to the current behavior anyway.We considered whether Streams could instead rely on other returned metadata from the fetches, but no such metadata is currently available in the Consumer API. It is only used to update the Consumer's metrics. We would have to propose to expose it somehow in the API solely to support this use case, which seems disproportionate. Alternatively, we could let Streams rely on the Consumer metric values in the production code path, but this seems brittle and unwise. Finally, the available metrics/metadata don't seem to provide the desired level of determinism, since they would only tell the caller how much lag was observed at the last poll, not when the last poll took place. This would let Streams improve the idling behavior, but it wouldn't let Streams guarantee that it actually waits for the next  poll before proceedingthat had been fetched. The theory was that when the consumer gets an empty fetch response back from the brokers, it means that the partition in question has no more records to fetch. However, it turns out that the broker may also return empty responses for other reasons, such as:

  • Quota enforcement
  • Request/response size limits
  • etc. Essentially, it's within the broker's contract that it can return no data for any reason.