Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: disable for suppress and global tables, and update return type of put

...

This KIP aims to clarify and define intended processor behavior with versioned stores.

Public Interfaces

DSL Processors

No changes to any public interfaces themselves, but the behavior of these existing public interfaces will change when relevant state stores are versioned:

  • KStream#join(KTable, ...) and KStream#join(GlobalKTable, ...) : stream-table join processors will be updated to perform timestamped lookups if the table is versioned
  • KTable#join(KTable, ...): table-table join processors, including foreign key joins, will be updated to not produce new join results on out-of-order records (by key) from versioned tables
  • KTable#filter(...) : table filter processors today have an optimization to drop null results if the previous result for the same key was also null. This optimization will be disabled if the table being filtered is versioned
  • KGroupedTable#aggregate() , and related methods KGroupedTable#count() and KGroupedTable#reduce() : table aggregate processors will be updated to ignore out-of-order records (by key) when aggregating a versioned table
  • KTable#suppress() : this method will be disabled for versioned tables. A TopologyException will be thrown.

For Continue to the next paragraph for the precise definition of what counts as a "versioned table."," see the first paragraph of the "Proposed Changes

...

The changes proposed to processor semantics above only apply to processors operating on versioned tables. A table is versioned iff a table is materialized as a versioned store upstream, with no materialization as an unversioned store or stateful transformation (aggregation or join) in between. In other words, a table will no longer be considered versioned downstream if any of the following occur:

  • an explicit materialization, i.e., passing a Materialized instance into a method which produces a resulting KTable, as an unversioned store
  • a stateful operation (aggregation or join) without an explicit Materialized instance
    • for aggregations, this happens because all aggregations are materialized, and implicit materialization today is always unversioned
    • for foreign-key table-table joins, this happens because all foreign-key joins are materialized, and implicit materialization today is always unversioned
    • for primary-key table-table joins, the result is not materialized by default but the result table should not be considered versioned today (see section on table-table joins below), and this design choice helps make this condition about stateful operations easier to reason about
  • calling KTable#suppress() – suppression intentionally collapses version history, and therefore the result is no longer is versioned. (Suppress can also be considered stateful.)
  • converting to a stream and calling KStream#toTable() to convert back. Converting a table to a stream causes the table to lose its versioned/unversioned status. In order to preserve such a table as versioned, users can pass an explicit Materialized instance into the KStream#toTable() call to materialize the table with a versioned store.

By default, all tables in an application are unversioned. Only once a user opts in by passing a Materialized instance to materialize a table with a versioned store does the first table become versioned. Downstream tables implicitly become versioned until one of the conditions above causes a downstream table to become unversioned, after which subsequent derived tables will be unversioned as well (until another table is explicitly materialized to be versioned). 

Stream-Table Joins Perform Timestamped Lookups on Versioned Tables

" section.

Versioned Stores

Also a couple updates to versioned stores themselves, to aid in shoring up processor semantics:

  • GlobalKTables will no longer be allowed to be versioned
  • VersionedKeyValueStore#put(...) will now have a return type: a long which is the validTo timestamp of the newly put record, with two special values to indicate either that no such timestamp exists (because the record is the latest for its key) or that the put did not take place (because grace period has elapsed). This also requires a corresponding change in VersionedBytesStore , specifically, a new put(key, value, timestamp) method will be added.
Code Block
public interface VersionedKeyValueStore<K, V> extends StateStore {

  long put(K key, V value, long timestamp); // <-- replaces the existing `void put(K key, V value, long timestamp);`

  // other existing methods unchanged
}

public interface VersionedBytesStore extends KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {

  long put(Bytes key, byte[] value, long timestamp); // <-- new method

  // other existing methods unchanged
}

Proposed Changes

The changes proposed to processor semantics above only apply to processors operating on versioned tables. A table is versioned iff a table is materialized as a versioned store upstream, with no materialization as an unversioned store or stateful transformation (aggregation or join) in between. In other words, a table will no longer be considered versioned downstream if any of the following occur:

  • an explicit materialization, i.e., passing a Materialized instance into a method which produces a resulting KTable, as an unversioned store
  • a stateful operation (aggregation or join) without an explicit Materialized instance
    • for aggregations, this happens because all aggregations are materialized, and implicit materialization today is always unversioned
    • for foreign-key table-table joins, this happens because all foreign-key joins are materialized, and implicit materialization today is always unversioned
    • for primary-key table-table joins, the result is not materialized by default but the result table should not be considered versioned today (see section on table-table joins below), and this design choice helps make this condition about stateful operations easier to reason about
  • converting to a stream and calling KStream#toTable() to convert back. Converting a table to a stream causes the table to lose its versioned/unversioned status. In order to preserve such a table as versioned, users can pass an explicit Materialized instance into the KStream#toTable() call to materialize the table with a versioned store.

By default, all tables in an application are unversioned. Only once a user opts in by passing a Materialized instance to materialize a table with a versioned store does the first table become versioned. Downstream tables implicitly become versioned until one of the conditions above causes a downstream table to become unversioned, after which subsequent derived tables will be unversioned as well (until another table is explicitly materialized to be versioned). 

Stream-Table Joins Perform Timestamped Lookups on Versioned Tables

Today, when a new record (from the stream-side) arrives at a stream-table join processor, the processor performs a get(key) lookup on the table materialization to find the record to join with. If the table store is versioned, the behavior will be to call get(key, timestamp) instead, where timestamp is the stream-side record's timestamp. This change provides proper temporal join semantics, when users opt in to using versioned stores. 

In the event that the stream-side record timestamp is older than the versioned store's history retention and get(key, timestamp) returns null, the join processor will handle this null in the same way as other nulls – for inner joins, no join result will be produced, whereas for left joins a join result with null table value will be produced. (The RocksDB-based versioned store implementation introduced in KIP-889 will log a warning on calls to get(key, timestamp) which have exceeded history retention, but there will be no other indication that this has occurred.)

For inner joins, this is the best option because any other table-side record that we might try to join the stream-side record with would result in incorrect temporal join semantics. For left joins, we could also consider the option of dropping the stream-side record (for which there is no table-side record due to history retention having elapsed) entirely rather than emitting a join result with null. Both approaches have their merits; there could be use cases which require/prefer stream-side records never be dropped from the join, while other use cases might prefer to exclude such records from the join result. I think it's best to have the processor err on the side of including the records and allowing users to identify and filter them out downstream if needed, rather than the reverse where users will have no way of getting the dropped records back. (There are also implementation challenges associated with having the processor determine whether a null returned from get(key, timestamp) is due to the store's history retention having been exceeded or not. To address these would be outside the scope of this KIP and likely require a KIP of its own. If this alternative behavior turns out to be preferable, we can introduce a config for it in the future.)

Table-Table Joins Do Not Produce Join Results on Out-of-order Records from Versioned Tables

This change is more nuanced and warrants additional context.

Background

The way that primary-key table-table joins work today is, whenever a new record arrives at the processor, a get(key) lookup is performed on the opposite table's materialization to find the record to join with, and the join result is emitted with the larger timestamp of the two records being joined. The latest (by offset) join result is always the join of the latest (by offset) records from each of the two tables. 

If there are no out-of-order records, then the timestamps of the join results will be non-decreasing as well, which means that the latest-by-offset join result is also the latest-by-timestamp join result. If there are out-of-order records, it is no longer true that the join result timestamps will be non-decreasing; it is possible that consumers of the join results will see time "move backwards." When using non-versioned stores, this is (perhaps) acceptable since state store semantics are offset-based, and the join respects that -- a join result is produced from the latest-by-offset records from each table, and this join result is the latest-by-offset record in the result topic as well. 

Versioned stores aim to provide timestamp-based semantics instead. A table-table join operating on versioned tables (i.e., tables materialized with versioned stores) should join the latest-by-timestamp records from each table, and this resulting join record should be the latest-by-timestamp in the result topic. 

We cannot guarantee this without changes to the table-table join processor logic today. Consider the following example of A join B where both stores are versioned, and all records are for the same key:

Code Block
A: (timestamp = 0, value = a0)
A: (timestamp = 4, value = a4)
B: (timestamp = 2, value = b2) -> emits (a4, b2) with timestamp=max(4, 2)=4
B: (timestamp = 1, value = b1

Today, when a new record (from the stream-side) arrives at a stream-table join processor, the processor performs a get(key) lookup on the table materialization to find the record to join with. If the table store is versioned, the behavior will be to call get(key, timestamp) instead, where timestamp is the stream-side record's timestamp. This change provides proper temporal join semantics, when users opt in to using versioned stores. 

In the event that the stream-side record timestamp is older than the versioned store's history retention and get(key, timestamp) returns null, the join processor will handle this null in the same way as other nulls – for inner joins, no join result will be produced, whereas for left joins a join result with null table value will be produced. (The RocksDB-based versioned store implementation introduced in KIP-889 will log a warning on calls to get(key, timestamp) which have exceeded history retention, but there will be no other indication that this has occurred.)

For inner joins, this is the best option because any other table-side record that we might try to join the stream-side record with would result in incorrect temporal join semantics. For left joins, we could also consider the option of dropping the stream-side record (for which there is no table-side record due to history retention having elapsed) entirely rather than emitting a join result with null. Both approaches have their merits; there could be use cases which require/prefer stream-side records never be dropped from the join, while other use cases might prefer to exclude such records from the join result. I think it's best to have the processor err on the side of including the records and allowing users to identify and filter them out downstream if needed, rather than the reverse where users will have no way of getting the dropped records back. (There are also implementation challenges associated with having the processor determine whether a null returned from get(key, timestamp) is due to the store's history retention having been exceeded or not. To address these would be outside the scope of this KIP and likely require a KIP of its own. If this alternative behavior turns out to be preferable, we can introduce a config for it in the future.)

Table-Table Joins Do Not Produce Join Results on Out-of-order Records from Versioned Tables

This change is more nuanced and warrants additional context.

Background

The way that primary-key table-table joins work today is, whenever a new record arrives at the processor, a get(key) lookup is performed on the opposite table's materialization to find the record to join with, and the join result is emitted with the larger timestamp of the two records being joined. The latest (by offset) join result is always the join of the latest (by offset) records from each of the two tables. 

If there are no out-of-order records, then the timestamps of the join results will be non-decreasing as well, which means that the latest-by-offset join result is also the latest-by-timestamp join result. If there are out-of-order records, it is no longer true that the join result timestamps will be non-decreasing; it is possible that consumers of the join results will see time "move backwards." When using non-versioned stores, this is (perhaps) acceptable since state store semantics are offset-based, and the join respects that -- a join result is produced from the latest-by-offset records from each table, and this join result is the latest-by-offset record in the result topic as well. 

Versioned stores aim to provide timestamp-based semantics instead. A table-table join operating on versioned tables (i.e., tables materialized with versioned stores) should join the latest-by-timestamp records from each table, and this resulting join record should be the latest-by-timestamp in the result topic. 

We cannot guarantee this without changes to the table-table join processor logic today. Consider the following example of A join B where both stores are versioned, and all records are for the same key:

Code Block
A: (timestamp = 0, value = a0)
A: (timestamp = 4, value = a4)
B: (timestamp = 2, value = b2) -> emits (a4, b2) with timestamp=max(4, 2)=4
B: (timestamp = 1, value = b1) -> emits (a4, b1) with timestamp=max(4, 1)=4 

...

Code Block
Record (all with same key)		Effect on unversioned aggregate			Effect on versioned aggregate
-----------------------------------------------------------------------------------------------------
(v1, ts=1)						add: v1									add: v1
(v2, ts=10)						remove: v1 ; add: v2					remove: v1 ; add: v2
(v3, ts=5)						remove: v2 ; add: v3					nothing -- v3 has an older timestamp than v2

Other Processors and the Long-Term Vision

The changes proposed to the processors above are necessary for coherent semantics when using versioned stores. This KIP does not propose changes to any of the other processors, and it's worth discussing why.

At the heart of the complexities around the table-table join semantics above is that it's not clear how tables should handle out-of-order records. Given that a KStream#toTable() method exists and that we fully expect streams to contain out-of-order records, it makes sense to support creating a table from a stream containing out-of-order records, particularly in light of versioned tables. But the possibility for tables with out-of-order records creates ambiguity for how table processors should handle these records. Rather than requiring each processor to handle these complexities (as loosely proposed above), I think it'd be better to introduce a "grace period" at table source nodes and in KStream#toTable() to reorder out-of-order records (per key) so that tables, at least the versioned ones, do not have to worry about out-of-order data at downstream processors. 

That said, this is not included in the scope of this KIP, since this KIP is focused only on the minimal processor updates required for versioned stores to "make sense" with existing processors today. Reordering out-of-order data at source table nodes is a larger change, and only nice-to-have rather than necessary.

In light of this long-term vision, this KIP does not propose changes to any of the other table processors either. Table operations such as filter , mapValues , and transformValues should not be in the business of re-ordering or dropping records, even if a table is materialized as a versioned table before or after any of these steps. (In fact, it's only because the existing filter implementation does drop records (tombstones) under certain situations that the filter processor needs an update in this KIP in the first place.) Table aggregation operations including count and reduce already produce results with non-decreasing timestamps, and therefore do not need updates at this time beyond ensuring that the proper records are included in the aggregation. And the requisite updates for table-table join processors have already been discussed above.

 timestamp than v2

Suppress Cannot Be Applied to Versioned Tables

The KTable#suppress() operation suppresses table updates (per key) and only forwards records downstream periodically. The purpose of a versioned table is to store a complete record history (subject to history retention time) for each key, so it's unclear what the semantics around suppressing updates should be.

If suppress is called on a versioned table, should the operator still collapse updates based on record offset as it does today? This would produce a versioned today with incomplete version history, and it's unclear what the use cases for this would be.

Should the operator collapse updates by keeping the latest by timestamp record instead? This would be consistent with the fact that other processors use timestamp-based semantics for versioned tables, but the value of suppressing updates in this way is limited as well. It amounts to dropping out-of-order records (per key) from the versioned table, which can be done more efficiently without using suppress and its associated buffer. 

Until we have a clearer understanding of possible use cases for suppressing a versioned table, it's best to disallow it in order to avoid surprising/confusing semantics.

Other Processors and the Long-Term Vision

The changes proposed to the processors above are necessary for coherent semantics when using versioned stores. This KIP does not propose changes to any of the other processors, and it's worth discussing why.

At the heart of the complexities around the table-table join semantics above is that it's not clear how tables should handle out-of-order records. Given that a KStream#toTable() method exists and that we fully expect streams to contain out-of-order records, it makes sense to support creating a table from a stream containing out-of-order records, particularly in light of versioned tables. But the possibility for tables with out-of-order records creates ambiguity for how table processors should handle these records. Rather than requiring each processor to handle these complexities (as loosely proposed above), I think it'd be better to introduce a "grace period" at table source nodes and in KStream#toTable() to reorder out-of-order records (per key) so that tables, at least the versioned ones, do not have to worry about out-of-order data at downstream processors. 

That said, this is not included in the scope of this KIP, since this KIP is focused only on the minimal processor updates required for versioned stores to "make sense" with existing processors today. Reordering out-of-order data at source table nodes is a larger change, and only nice-to-have rather than necessary.

In light of this long-term vision, this KIP does not propose changes to any of the other table processors either. Table operations such as filter , mapValues , and transformValues should not be in the business of re-ordering or dropping records, even if a table is materialized as a versioned table before or after any of these steps. (In fact, it's only because the existing filter implementation does drop records (tombstones) under certain situations that the filter processor needs an update in this KIP in the first place.) Table aggregation operations including count and reduce already produce results with non-decreasing timestamps, and therefore do not need updates at this time beyond ensuring that the proper records are included in the aggregation. And the requisite updates for table-table join processors have already been discussed above.

In the future if we update source table nodes to reorder out-of-order data for versioned tables, then this current proposed special-handling for versioned tables in the various table processors (table-table join, table aggregate, and table filter) will no longer be needed either. 

GlobalKTables Cannot Be Versioned

At the time of publishing KIP-889 for introducing versioned stores, it didn't seem like there were reasons GlobalKTables shouldn't be allowed to be versioned but in thinking about processor semantics we now have a reason. Because KStream-GlobalKTable joins today are not timestamp aware, as in, there is no guarantee on the processing order of stream-side records relative to records from the global table (due to the fact that the threads consuming from each are separate, and there is no synchronization), this means the benefit of table versioning during a KStream-GlobalKTable join is limited and also challenging to reason about. In the interest of limiting surface area for now, let's disallow global tables from being versioned since we can always enable it later on if needed. 

If a user tries to pass a versioned state store as the materialization for a global table, a TopologyException will be thrown when building the topology, and the error message will make clear that this is why. 

Versioned Stores Return validTo Timestamp on Put

The purpose of this change is to provide more information about the outcome of a call to VersionedKeyValueStore#put(...) . The return value will be the validTo timestamp of the newly put record, with two special values to be aware of:

  • -1, used throughout Streams to indicate "no timestamp," means that the record which was put is the latest record version for the particular key, and therefore there is no validTo timestamp
  • Long.MIN_VALUE will indicate that the grace period has elapsed, and the put did not take place.

This additional metadata about the result of the put enables processors to determine when a record is out-of-order, so that table-table joins and aggregates may drop out-of-order data as describe above. The inclusion of the special MIN_VALUE return value to indicate that the put did not take place is included both to disambiguate from the -1 case and also so that callers may choose to take different actions in this case.

As part of updating the return type for VersionedKeyValueStore#put(...) , a corresponding update to VersionedBytesStore is needed in order to pass the return value when converting from KeyValueStore<Bytes, byte[]>  back to VersionedKeyValueStore . This is largely an implementation detail since users are not expected to interact directly with VersionedBytesStore , and is only included as part of this KIP because VersionedBytesStore  is technically a public interface (because it is required from VersionedBytesStoreSupplier , the return type from Stores#persistentVersionedKeyValueStore(...) .

The reason a new method is added to VersionedBytesStore rather than updating an existing one is because the existing put(key, value) method of VersionedBytesStore cannot be updated as the method signature is inherited from KeyValueStore<Bytes, byte[]> . As a workaround, a new method with signature put(key, value, timestamp) is added instead. This new signature also more closely aligns with the VersionedKeyValueStore  usage of the method, which allows us to simplify the internal implementation.In the future if we update source table nodes to reorder out-of-order data for versioned tables, then this current proposed special-handling for versioned tables in the various table processors (table-table join, table aggregate, and table filter) will no longer be needed either. 

Compatibility, Deprecation, and Migration Plan

...

Integration testing for the new types of processor functionality using versioned stores: validating results with different combinations of in-order and of in-order and out-of-order data.

Rejected Alternatives

Implement the long-term vision directly

As described above, it'd be nice if – long term – table source nodes (including KStream#toTable()) implemented a "grace period" to use in re-ordering out-of-order records, at least for versioned tables, as this would obviate the need for discussion about how individual table processors should handle out-of-order data.

Rejected Alternatives

Implement the long-term vision directly

As described above, it'd be nice if – long term – table source nodes (including KStream#toTable()) implemented a "grace period" to use in re-ordering out-of-order records, at least for versioned tables, as this would obviate the need for discussion about how individual table processors should handle out-of-order data.

I'd like to ship KIP-889 and versioned stores without blocking on this as it's a larger, more complex change with follow-up design discussions needed, whereas versioned stores in their current form can already provide value to users. In order to ship KIP-889, there are some minimal processor changes required, and those are the only ones proposed in this KIP.

Have table-table joins on versioned tables produce an older join result on out-of-order data

See discussion above ("Attempt 1" and "Attempt 2"). Producing a complete, accurate history of older joins results is costly. Producing an incomplete history without accuracy guarantees is not valuable to users. 

Drop out-of-order records from versioned tables everywhere, not just in table-table joins and table aggregations

Rather than updating just the table-table join and table aggregate processors to not produce/update results on out-of-order data from versioned tables, we could consider extending this decision to "ignore" out-of-order data for all table processors when using versioned stores. I don't think we should do this because the out-of-order data is still valuable -- we might still want to keep it in the versioned table in certain situations. However, we do know that we don't want it for purposes of the table-table join (unless we want the table-table join to produce a complete, accurate history of older join results, which is discussed above) or table aggregate, which is why it's fine to drop out-of-order data at these specific processors.

If instead of dropping the out-of-order records entirely we re-order them, and if instead of doing this in every processor we do it once at the table source, then this suggestion is the same as the long-term vision proposed above.

Make these changes to the various processors configurable

I'd like to ship KIP-889 and versioned stores without blocking on this as it's a larger, more complex change with follow-up design discussions needed, whereas versioned stores in their current form can already provide value to users. In order to ship KIP-889, there are some minimal processor changes required, and those are the only ones proposed in this KIP.

Have table-table joins on versioned tables produce an older join result on out-of-order data

See discussion above ("Attempt 1" and "Attempt 2"). Producing a complete, accurate history of older joins results is costly. Producing an incomplete history without accuracy guarantees is not valuable to users. 

Drop out-of-order records from versioned tables everywhere, not just in table-table joins and table aggregations

Rather than updating just the table-table join and table aggregate processors to not produce/update results on out-of-order data from versioned tables, we could consider extending this decision to "ignore" out-of-order data for all table processors when using versioned stores. I don't think we should do this because the out-of-order data is still valuable -- we might still want to keep it in the versioned table in certain situations. However, we do know that we don't want it for purposes of the table-table join (unless we want the table-table join to produce a complete, accurate history of older join results, which is discussed above) or table aggregate, which is why it's fine to drop out-of-order data at these specific processors.

If instead of dropping the out-of-order records entirely we re-order them, and if instead of doing this in every processor we do it once at the table source, then this suggestion is the same as the long-term vision proposed above.

Make these changes to the various processors configurable

The changes are only being applied to versioned stores, which are purely opt-in. If users don't want the changes, they don't have to opt in. If it turns out there are use cases for wanting versioned tables but not wanting these changes, we can discuss adding a config as a follow-up.

A more structured return type for VersionedKeyValueStore#put(...) 

Instead of returning a long with a special value to indicate whether the put call was accepted or not, we could return a structured type such as "PutMetadata" which contains a long field (for the validTo timestamp) and a boolean (to indicate whether the put was accepted). This would allow more evolvability in the future, if it turns out there are other types of metadata which users have use for, but would also be more cumbersome in the meantime. Given that we don't anticipate additional pieces of metadata being valuable to return on put, let's keep the return type simple rather than building for evolvability that will likely go unusedThe changes are only being applied to versioned stores, which are purely opt-in. If users don't want the changes, they don't have to opt in. If it turns out there are use cases for wanting versioned tables but not wanting these changes, we can discuss adding a config as a follow-up.

Additional stream-table join improvements, such as grace periods

...