Status

Current state"Under Discussion"

Discussion thread: TODO

JIRA: KAFKA-6455

Released: 1.2

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams DSL inherits the timestamp propagation "contract" from the Processor API. In this "contract", the output record gets the timestamp of the current input record assigned. For many operators like filter, map, etc this contract is reasonable. However, for more complex operators like aggregation or joins, this contract in not optimal, as it does not provide strong guarantees about the timestamp of the result records. This make reasoning about the expected result hard and non-deterministic with regard to the record timestamp. However, the timestamp in event-time stream processing is as important as the actual data. Hence, we should define a DSL level contract that matches the semantics of the corresponding DSL operator.

Public Interfaces

No public interfaces changes. This KIP suggests a semantical change only.

Proposed Changes

For the following operators nothing changes:

For this operations, all output record (note, that flatMap and flatMapValues might have multiple output records for a single input record) get the same timestamp as the input record. Also note, that for KStream-KTable join, only the KStream input can trigger an output record.

We suggest to change the semantics for timestamp inheritance for the following operators:

This KIP builds on KIP-259 and KIP-251. We can store timestamps for aggregations and join in the underlying RocksDB stores and thus use "max(oldTimestamp, recordTimestamp)" (for aggregation) and "max(r1.t,r2.t)" (for joins) to compute the output record's timestamp and set the timestamp explicitly when forwarding the output record downstream.

Compatibility, Deprecation, and Migration Plan

This is a semantical change and is thus backward compatible.

Test Plan

We test the new behavior of the operators with existing unit and integration tests that we update to also check the output record timestamps.

Rejected Alternatives

Note.