Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Discuss Accepted

Discussion thread: TBD link

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3514

...

Today Kafka Streams stream time inference and synchronization is tricky to understand and also introduces much non-detereminism into the processing ordering when a task is fetching from multiple input streams (a.k.a. topic-partitions from Kafka).

More specifically, a stream task may contain multiple input topic-partitions. And hence we need to decide 1) which topic-partition to pick the next record to process for this task, 2) how stream time will be advanced when records are being processed. Today this logic is determined in the following way:

...

c) The task's stream time is defined as the smallest value across all of its partitions' timestamp, and hence is also monotonically increasing.


The above behavior is based on the motivations that for operations such as joins, we want to synchronize the timestamps across multiple input streams so that we can avoid "out-of-ordering" data across multiple topic-partitions in a best effort (note that within a single topic-partition we always process data following the offset ordering).

We have observed the above mechanism has the following issue:

...

Code Block
languagejava
public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms"     // the maximum wall-clock time a task could stay to be not processed while still containing some buffered data in at least one of its partitions


When a task is enforced to be processed via this config, it could potentially introduce out-of-ordering (i.e. you may be processing a record with timestamp t0 while your stream time has been advanced to t1 > t0) and non-detereminism behavior when forcing a task for processing could also be non-deterministic and might result in a different output than the deterministic re-processing case. And users  Users can use this config to control how much they can pay for latency (i.e. not processing the task) in order to reduce out-of-ordering possibilities: when it is set to Long.MAX, then we will wait indefinitely on the task before any data may be processed for this task; on the other extreme case, when it is set to 0 we will always process the task whenever it has some data, and bare bear in mind that such an enforced processing may cause out-of-ordering. And in order to let users be notified qualitatively such potential "out-of-ordering" events, we will also introduce a task-level metric:

...