You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Draft

Discussion thread: n/a

JIRA: Unable to render Jira issues macro, execution error.


Motivation

When Streams is processing a task with multiple inputs, each time it is ready to process a record, it has to choose which input to process next. It always takes from the input for which the next record has the least timestamp. The result of this is that Streams processes data in timestamp order. However, if the buffer for one of the inputs is empty, Streams doesn't know what timestamp the next record for that input will be.

Streams introduced a configuration "max.task.idle.ms" in KIP-353 to address this issue (https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization ).

The config allows Streams to wait some amount of time for data to arrive on the empty input, so that it can make a timestamp-ordered decision about which input to pull from next.

However, this config can be hard to use reliably and efficiently, since what we're really waiting for is the next poll that would return data from the empty input's partition, and this guarantee is a function of the poll interval, the max poll interval, and the internal logic that governs when Streams will poll again.

In the situation of a task with two (or more) inputs, there are three use cases:

UC1: I do not care about selecting among the inputs in order. I never want to wait to fetch data from empty inputs as long as I have at least one non-empty input. I just want to process records from any input as soon as they are available.

UC2: I would like to try and select among the inputs in order, but if some of the partitions are empty on the brokers , I don't want to wait for the producers to send more data.

UC3: I would like to try and select among the inputs in order, but if some of the partitions are empty on the brokers , I would like to wait some configured amount of time.

Proposed Solution

Streams Config change

We propose to alter the max.task.idle.ms config to guarantee to wait for fetches from the empty partitions before actualizing the timeout. This preserves the meaning of the parameter (how long to wait for more data to arrive for empty partitions before proceeding), but changes its behavior. Rather than starting to count against the timeout as soon as we discover an empty local buffer, we will only start to count after we have an empty local buffer followed by an empty fetch result for that partition .

With this proposal:

  • UC2 can be satisfied with a configuration value of "max.task.idle.ms: 0", since it means they will do at least one extra fetch for their empty partitions, and if those partitions are still empty, they proceed right away.
  • UC3 can be satisfied with any positive integer value for "max.task.idle.ms". They will wait the configured amount of time while also being sure they actually fetch from the brokers before enforcing the timeout.

Note that UC1 is not satisfiable with this alone, so we also propose to add an extra config value of "-1", which indicates that the task idling feature itself should be disabled. So:

  • UC1 can be satisfied with the configuration value of "max.task.idle.ms: -1", which indicates that Streams will never wait to buffer extra data, but always choose from what it already has buffered locally.

Consumer change

Finally, while analyzing possible implementations, we have determined that the current Consumer API does not offer a deterministic way to know whether a specific partition has been fetched. Right now, callers can only observe when a poll returns some data for a partition. If poll does not return data from a partition, it might have fetched and received no records, or it might not have fetched (yet). Therefore, we also prose to alter Consumer#poll to supply this information.

In the current API, Consumer#poll returns a ConsumerRecords instance, which is an Iterable collection of ConsumerRecord instances, and which also offers a ConsumerRecords#partitions method to list the partitions that are represented in the results. This list of partitions does not currently include partitions that were fetched but contain no results. We propose to include these "empty fetched" partitions in the listing of ConsumerRecords#partitions.

Further, there is also a ConsumerRecords#records(partition) that returns a list of records that were polled for that partition. If the caller supplies a partition for which there are no polled records, the method returns an empty list. Note that this continues to provide sound semantics when we start to include "empty fetched" partitions from the partitions() call. For further considerations of backward compatibility, see "Compatibility, Deprecation, and Migration Plan" below.

Streams will be able to use this new behavior by maintaining internal flags of which partitions have been fetched, even if they contained no records, by taking note of the partitions returned from ConsumerRecords#partitions.

Public Interfaces Affected

No Java interface changes, only:

  • Change "max.task.idle.ms" to accept "-1" as a flag to disable task idling entirely
  • Change default of "max.task.idle.ms" to "-1" so that task idling continues to be disabled by default
  • Change "ConsumerRecords#partitions()" to list all partitions that were fetched, even if no records were returned

Proposed Changes

We propose to alter the config value space of "max.task.idle.ms" in the following way:

  • Current:
    • negative integer: disallowed
    • 0 (default): indicates that Streams will never wait to buffer empty partitions before choosing the next record by timestamp
    • positive integer: indicates the amount of wall-clock time on the client to wait for more data to arrive on empty partitions (but no guarantee that the client will actually fetch again before the timeout expires).
  • Proposed:
    • negative integer other than -1: disallowed
    • -1 (default): indicates that Streams will never wait to buffer empty partitions before choosing the next record by timestamp

We also propose to include all partitions that were fetched since the last Consumer#poll in the returned ConsumerRecords#partitions(). This includes partitions that were fetched, but returned no records.

Compatibility, Deprecation, and Migration Plan

Streams Config Change

The config change is mostly  backward compatible. The default behavior stays the same, because we change the default value from "0ms" to "-1 (disable idling)".

If users currently set a value for idling, they will observe upon upgrade that Streams starts to wait longer than before. This is not a strict violation of the old configuration, because Streams didn't previously promise to strictly adhere to the configured time, but rather just idle at least  that long. From discussions with users on the mailing list and elsewhere, including in Unable to render Jira issues macro, execution error. , we believe that most users already would have assumed (or at least prefer) that we actually wait for fetches before enforcing the timeout. Thus, although the behavior will change, it will hopefully be a beneficial change.

The most concerning case for backward behavioral compatibility is if users have already set the config to "0ms" (overriding the default with the same value). If these users wished to indicate that they never want Streams to wait on empty partitions, then they may be dismayed to see Streams starting to wait. They will have to change the config to "-1 (disable idling)" or just unset the config, since the default behavior is not to wait anyway. There doesn't seem to be any obvious way to overcome this, except to deprecate the existing configuration and replace it with a new one. If this consideration seems significant to reviewers, this is exactly what I would propose to do instead.

ConsumerRecords#partitions Change

All of the methods in ConsumerRecords will continue to behave in exactly the same way, except partitions(), which will start to return extra partitions for which there are no records. It seems like the only way that this could meaningfully affect a program is if the caller of ConsumerRecords#partitions() uses the returned partitions to list records via ConsumerRecords#records(partition). If they have an expectation that the latter call will never return an empty list, then we would start to violate that expectation.

From a distance, this expectation would seem to be invalid, since an empty list is already in the range of results for ConsumerRecords#records(partition). Playing devil's advocate, the following program would be broken and might be considered legitimate:

...
for (partition : consumerRecords.partitions()) {
  records = consumerRecords.records(partition);
  highestFetchedOffsets.put(partition, records.get(records.size()-1).offset());
  ...
}

This code tracks the highest fetched offset for each partition by simply getting the offset of the last record returned for each partition. Since partitions() would only return partitions for which we fetched at least one record, this code would today always work, since records.size() would always be at least 1 in practice, and records.get(records.size()-1) would therefore always return the last record in the list.

However, after the proposed change, this code would begin throwing an IndexOutOfBoundsException when we fetched no records for a partition, since records.size() would evaluate to 0 in that case, leading to the invalid records.get(-1). Clearly, the workaround is to trivially check whether the list is empty before indexing into it. The key question is whether use cases like this are expected and supported, and whether it justified instead proposing a new method, like ConsumerRecords#fetchedPartitions() with the desired semantics.

Rejected Alternatives

Streams Config Change

We could instead propose a new config with the desired semantics and deprecate the existing one. We judged it to be simpler for both users and maintainers to keep the config the same, since the semantics are so close to the existing ones. We are taking the perspective that the current "semantics" are really just a bugged implementation of the semantics we desired to provide in the first place.

ConsumerRecords#partitions Change

Instead of starting to return extra partitions in the ConsumerRecords#partitions() listing, we could propose to add a new method to include the partitions, like ConsumerRecords#fetchedPartitions(). The behavior change seems minor enough to justify just making the change in place, rather than introducing a new method to document and support from the maintainer side and learn about and use from the user side. Plus, the proposed behavior might be viewed as semantically compatible and otherwise superior to the current behavior anyway.

We considered whether Streams could instead rely on other returned metadata from the fetches, but no such metadata is currently available in the Consumer API. It is only used to update the Consumer's metrics. We would have to propose to expose it somehow in the API solely to support this use case, which seems disproportionate. Alternatively, we could let Streams rely on the Consumer metric values in the production code path, but this seems brittle and unwise. Finally, the available metrics/metadata don't seem to provide the desired level of determinism, since they would only tell the caller how much lag was observed at the last poll, not when the last poll took place. This would let Streams improve the idling behavior, but it wouldn't let Streams guarantee that it actually waits for the next  poll before proceeding.

  • No labels