...
KStream#join(KTable, ...)
andKStream#join(GlobalKTable, ...)
: stream-table join processors will be updated to perform timestamped lookups if the table is versionedKTable#join(KTable, ...)
: table-table join processors, including foreign key joins, will be updated to not produce new join results on out-of-order records (by key) from versioned tablesKTable#filter(...)
: table filter processors today have an optimization to drop null results if the previous result for the same key was also null. This optimization will be disabled if the table being filtered is versionedKGroupedTable#aggregate()
, and related methodsKGroupedTable#count()
andKGroupedTable#reduce()
: table aggregate processors will be updated to ignore out-of-order records (by key) when aggregating a versioned table.
As part of implementing the proposed change to table aggregate processors above, the table repartition topic format will also be updated to include an extra timestamp – the timestamp of the "old value" being removed from the aggregate.
Continue to the next paragraph for the precise definition of what counts as a "versioned table."
...
The changes proposed to processor semantics above only apply to processors operating on versioned tables. A table is versioned iff a table is materialized as a versioned store upstream, with no materialization as an unversioned store in between. A materialization as an unversioned occurs due to any ofor stateful transformation (aggregation or join) in between. In other words, a table will no longer be considered versioned downstream if any of the following occur:
- an explicit materialization, i.e., passing a
Materialized
instance into a method which produces a resulting KTable, as an unversioned store - a stateful operation (aggregation or join) performing an aggregation without an explicit
Materialized
instance –- for aggregations, this happens because all aggregations are materialized, and implicit materialization today is always unversioned
- for foreign-key table-table
Materialized
instance –- joins, this happens because all foreign-key joins are materialized, and implicit materialization today is always unversioned
- calling
KTable#suppress()
– suppression intentionally collapses version history, and therefore the result is no longer is versioned - for primary-key table-table joins, the result is not materialized by default but the result table should not be considered versioned today (see section on table-table joins below), and this design choice helps make this condition about stateful operations easier to reason about
- calling
KTable#suppress()
– suppression intentionally collapses version history, and therefore the result is no longer is versioned. (Suppress can also be considered stateful.) - converting to a stream and calling
KStream#toTable()
to converting to a stream and callingKStream#toTable()
to convert back. Converting a table to a stream causes the table to lose its versioned/unversioned status. In order to preserve such a table as versioned, users can pass an explicitMaterialized
instance into theKStream#toTable()
call to materialize the table with a versioned store.
...
Code Block |
---|
Record (all with same key) Effect on unversioned aggregate Effect on versioned aggregate ----------------------------------------------------------------------------------------------------- (v1, ts=1) add: v1 add: v1 (v2, ts=10) remove: v1 ; add: v2 remove: v1 ; add: v2 (v3, ts=5) remove: v2 ; add: v3 nothing -- v3 has an older timestamp than v2 |
Additional Timestamp in Table Repartition Topic
In order to implement this change to have table aggregate ignore drop out-of-order records, the processor needs a way to determine whether a record is out-of-order. This cannot be done by querying the source table materialization (to determine the latest timestamp) as the source table materialization is part of a different subtopology, due to the fact that a repartition occurs before the aggregation. As such, this timestamp must be passed as part via the repartition topic. Specifically, the timestamp that will be passed is the timestamp of the "old record" – the one which is currently part of the aggregation result, and may need to be removed accordingly.
Normally, evolving the repartition topic format in order to add an additional timestamp would introduce a number of compatibility and upgrade implications but in this case KIP-904 has actually already proposed an update to the same repartition topic format, and is slated for the same release as this KIP. This means we can "piggyback" the change to add an additional timestamp without introducing additional compatibility/upgrade implications, as long as the two changes land in the same release.
The repartition topic format proposed in KIP-904 is:
|
This KIP proposes to update it to
|
...
Other Processors and the Long-Term Vision
...