Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: remove proposal to update repartition topic format + minor update to definition of "versioned" table

...

  • KStream#join(KTable, ...) and KStream#join(GlobalKTable, ...): stream-table join processors will be updated to perform timestamped lookups if the table is versioned
  • KTable#join(KTable, ...): table-table join processors, including foreign key joins, will be updated to not produce new join results on out-of-order records (by key) from versioned tables
  • KTable#filter(...) : table filter processors today have an optimization to drop null results if the previous result for the same key was also null. This optimization will be disabled if the table being filtered is versioned
  • KGroupedTable#aggregate() , and related methods KGroupedTable#count() and KGroupedTable#reduce() : table aggregate processors will be updated to ignore out-of-order records (by key) when aggregating a versioned table.

As part of implementing the proposed change to table aggregate processors above, the table repartition topic format will also be updated to include an extra timestamp – the timestamp of the "old value" being removed from the aggregate. 

Continue to the next paragraph for the precise definition of what counts as a "versioned table."

...

The changes proposed to processor semantics above only apply to processors operating on versioned tables. A table is versioned iff a table is materialized as a versioned store upstream, with no materialization as an unversioned store in between. A materialization as an unversioned occurs due to any ofor stateful transformation (aggregation or join) in between. In other words, a table will no longer be considered versioned downstream if any of the following occur:

  • an explicit materialization, i.e., passing a Materialized instance into a method which produces a resulting KTable, as an unversioned store
  • a stateful operation (aggregation or join) performing an aggregation without an explicit Materialized  instance
    • for aggregations, this happens because all aggregations are materialized, and implicit materialization today is always unversioned
    performing a
    • for foreign-key table-table
    foreign-key join without an explicit Materialized instance –
    • joins, this happens because all foreign-key joins are materialized, and implicit materialization today is always unversioned
  • calling KTable#suppress() – suppression intentionally collapses version history, and therefore the result is no longer is versioned
    • for primary-key table-table joins, the result is not materialized by default but the result table should not be considered versioned today (see section on table-table joins below), and this design choice helps make this condition about stateful operations easier to reason about
  • calling KTable#suppress() – suppression intentionally collapses version history, and therefore the result is no longer is versioned. (Suppress can also be considered stateful.)
  • converting to a stream and calling KStream#toTable() to converting to a stream and calling KStream#toTable() to convert back. Converting a table to a stream causes the table to lose its versioned/unversioned status. In order to preserve such a table as versioned, users can pass an explicit Materialized instance into the KStream#toTable() call to materialize the table with a versioned store.

...

Code Block
Record (all with same key)		Effect on unversioned aggregate			Effect on versioned aggregate
-----------------------------------------------------------------------------------------------------
(v1, ts=1)						add: v1									add: v1
(v2, ts=10)						remove: v1 ; add: v2					remove: v1 ; add: v2
(v3, ts=5)						remove: v2 ; add: v3					nothing -- v3 has an older timestamp than v2

Additional Timestamp in Table Repartition Topic

In order to implement this change to have table aggregate ignore drop out-of-order records, the processor needs a way to determine whether a record is out-of-order. This cannot be done by querying the source table materialization (to determine the latest timestamp) as the source table materialization is part of a different subtopology, due to the fact that a repartition occurs before the aggregation. As such, this timestamp must be passed as part via the repartition topic. Specifically, the timestamp that will be passed is the timestamp of the "old record" – the one which is currently part of the aggregation result, and may need to be removed accordingly. 

Normally, evolving the repartition topic format in order to add an additional timestamp would introduce a number of compatibility and upgrade implications but in this case KIP-904 has actually already proposed an update to the same repartition topic format, and is slated for the same release as this KIP. This means we can "piggyback" the change to add an additional timestamp without introducing additional compatibility/upgrade implications, as long as the two changes land in the same release.

The repartition topic format proposed in KIP-904 is:

{BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
{BYTE_ARRAY newValue}{BYTE newOldFlag=1}
{UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}

This KIP proposes to update it to

{BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
{BYTE_ARRAY newValue}{BYTE newOldFlag=1}
{UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{INT64 oldTimestamp}{BYTE newOldFlag=2}

...

Other Processors and the Long-Term Vision

...