Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: clarify stream-table join behavior when history retention has been exceeded

...

Today, when a new record (from the stream-side) arrives at a stream-table join processor, the processor performs a get(key) lookup on the table materialization to find the record to join with. If the table store is versioned, the behavior will be to call get(key, timestamp) instead, where timestamp is the stream-side record's timestamp. This change provides proper temporal join semantics, when users opt in to using versioned stores. 

Table-Table Joins Do Not Produce Join Results on Out-of-order Records from Versioned Tables

This change is more nuanced and warrants additional context.

Background

The way that primary-key table-table joins work today is, whenever a new record arrives at the processor, a get(key) lookup is performed on the opposite table's materialization to find the record to join with, and the join result is emitted with the larger timestamp of the two records being joined. The latest (by offset) join result is always the join of the latest (by offset) records from each of the two tables. 

If there are no out-of-order records, then the timestamps of the join results will be non-decreasing as well, which means that the latest-by-offset join result is also the latest-by-timestamp join result. If there are out-of-order records, it is no longer true that the join result timestamps will be non-decreasing; it is possible that consumers of the join results will see time "move backwards." When using non-versioned stores, this is (perhaps) acceptable since state store semantics are offset-based, and the join respects that -- a join result is produced from the latest-by-offset records from each table, and this join result is the latest-by-offset record in the result topic as well. 

Versioned stores aim to provide timestamp-based semantics instead. A table-table join operating on versioned tables (i.e., tables materialized with versioned stores) should join the latest-by-timestamp records from each table, and this resulting join record should be the latest-by-timestamp in the result topic. 

We cannot guarantee this without changes to the table-table join processor logic today. Consider the following example of A join B where both stores are versioned, and all records are for the same key:

Code Block
A: (timestamp = 0, value = a0)
A: (timestamp = 4, value = a4)
B: (timestamp = 2, value = b2) -> emits (a4, b2) with timestamp=max(4, 2)=4
B: (timestamp = 1, value = b1) -> emits (a4, b1) with timestamp=max(4, 1)=4 

The arrival of the out-of-order record on the B side has triggered a new join result (a4, b1) with the same timestamp as the previous join result (a4, b2), which means the new join result replaces the previous one, even though the previous one is the join of the latest-by-timestamp records from each table. There are a number of ways we can try to fix this. 

Attempt 1

One idea is to say, when joining versioned tables, if the new record is not out-of-order, emit the latest join result. Else, perform a single timestamped lookup into the other store and emit an older result. 

In the event that the stream-side record timestamp is older than the versioned store's history retention and get(key, timestamp) returns null, the join processor will handle this null in the same way as other nulls – for inner joins, no join result will be produced, whereas for left joins a join result with null table value will be produced. (The RocksDB-based versioned store implementation introduced in KIP-889 will log a warning on calls to get(key, timestamp) which have exceeded history retention, but there will be no other indication that this has occurred.)

For inner joins, this is the best option because any other table-side record that we might try to join the stream-side record with would result in incorrect temporal join semantics. For left joins, we could also consider the option of dropping the stream-side record (for which there is no table-side record due to history retention having elapsed) entirely rather than emitting a join result with null. Both approaches have their merits; there could be use cases which require/prefer stream-side records never be dropped from the join, while other use cases might prefer to exclude such records from the join result. I think it's best to have the processor err on the side of including the records and allowing users to identify and filter them out downstream if needed, rather than the reverse where users will have no way of getting the dropped records back. (There are also implementation challenges associated with having the processor determine whether a null returned from get(key, timestamp) is due to the store's history retention having been exceeded or not. To address these would be outside the scope of this KIP and likely require a KIP of its own. If this alternative behavior turns out to be preferable, we can introduce a config for it in the future.)

Table-Table Joins Do Not Produce Join Results on Out-of-order Records from Versioned Tables

This change is more nuanced and warrants additional context.

Background

The way that primary-key table-table joins work today is, whenever a new record arrives at the processor, a get(key) lookup is performed on the opposite table's materialization to find the record to join with, and the join result is emitted with the larger timestamp of the two records being joined. The latest (by offset) join result is always the join of the latest (by offset) records from each of the two tables. 

If there are no out-of-order records, then the timestamps of the join results will be non-decreasing as well, which means that the latest-by-offset join result is also the latest-by-timestamp join result. If there are out-of-order records, it is no longer true that the join result timestamps will be non-decreasing; it is possible that consumers of the join results will see time "move backwards." When using non-versioned stores, this is (perhaps) acceptable since state store semantics are offset-based, and the join respects that -- a join result is produced from the latest-by-offset records from each table, and this join result is the latest-by-offset record in the result topic as well. 

Versioned stores aim to provide timestamp-based semantics instead. A table-table join operating on versioned tables (i.e., tables materialized with versioned stores) should join the latest-by-timestamp records from each table, and this resulting join record should be the latest-by-timestamp in the result topic. 

We cannot guarantee this without changes to the table-table join processor logic today. Consider the following example of A join B where both stores are versioned, and all records are for the same keyWith this approach, the above example becomes:

Code Block
A: (timestamp = 0, value = a0)
A: (timestamp = 4, value = a4)
B: (timestamp = 2, value = b2) -> latest record on B side, joins with latest on A side to emitemits (a4, b2) with timestamp=max(4, 2)=4
B: (timestamp = 1, value = b1) -> out-of-order record on B side, timestamped lookup on A side finds a0 to emit (a0, b1) with emits (a4, b1) with timestamp=max(04, 1)=1

This ensures that the latest (by timestamp) join result will indeed always be the join of the latest (by timestamp) records from each table, so the original concern is fixed. However, the value of the out-of-order join results emitted is dubious, as they are not guaranteed to be correct. Consider a different example:

The arrival of the out-of-order record on the B side has triggered a new join result (a4, b1) with the same timestamp as the previous join result (a4, b2), which means the new join result replaces the previous one, even though the previous one is the join of the latest-by-timestamp records from each table. There are a number of ways we can try to fix this. 

Attempt 1

One idea is to say, when joining versioned tables, if the new record is not out-of-order, emit the latest join result. Else, perform a single timestamped lookup into the other store and emit an older result. 

With this approach, the above example becomes:

Code Block
A: (timestamp = 0, value = a0)
Code Block
A: (timestamp = 0, value = a0)
B: (timestamp = 0, value = b0) -> latest record on B side, emits (a0, b0) with timestamp=0
A: (timestamp = 4, value = a4) -> latest record on A side, emits (a4, b0) with timestamp=4
B: (timestamp = 32, value = b3b2) -> latest record on B side, emits joins with latest on A side to emit (a4, b3b2) with timestamp=max(4, 2)=4
B: (timestamp = 21, value = b2b1) -> out-of-order record on B side, timestamped emitslookup (a0, b2) with timestamp=2
A: (timestamp = 1, value = a1) -> out-of-order record on A side, emits (a1, b0on A side finds a0 to emit (a0, b1) with timestamp=1, but now the (a0max(0, b2) for timestamp=2 above is incorrect.

Everything was fine up until the last record came. The (a1, b0) join result triggered by the last record is correct, but the (a0, b2) record triggered by the previous record is now incorrect; it should be (a1, b2) instead. In order to properly emit (a1, b2) in addition to (a1, b0) when then a1 record arrives, we have to perform a scan for versions to join with in the B store, instead of performing a single timestamped lookup.

Attempt 2

Suppose we want to maintain that older join results are always accurate. As noted above, this requires performing a scan for old record versions in the versioned store. This certainly works, but it quickly becomes complicated and the number of join results that need to be emitted grows quickly as well.

1)=1

This ensures that the latest (by timestamp) join result will indeed always be the join of the latest (by timestamp) records from each table, so the original concern is fixed. However, the value of the out-of-order join results emitted is dubious, as they are not guaranteed to be correct. Consider a different example:

Code Block
A: (timestamp = 0, value = a0)
B: (timestamp = 0, value = b0) -> latest record on B side, emits (a0, b0) with timestamp=0
Code Block
A: (timestamp = 04, value = a0)
A: (timestamp = 5, value = a5)a4) -> latest record on A side, emits (a4, b0) with timestamp=4
B: (timestamp = 23, value = b2b3) -> emits two join results: (a0, b2) for ts=2, and (a5, b2) for ts=5
B: (latest record on B side, emits (a4, b3) with timestamp=4
B: (timestamp = 32, value = b3b2) -> emits two join results: (a0, b3) for ts=3, and (a5, b3) for ts=5
Bout-of-order record on B side, emits (a0, b2) with timestamp=2
A: (timestamp = 41, value = b4a1) -> emits two join results: (a0, b4) for ts=4, and (a5, b4) for ts=5
A: (timestamp = 1, value = a1) -> emits three join results: (a1out-of-order record on A side, emits (a1, b0) with timestamp=1, but now the (a0, b2) for tstimestamp=2 andabove (a1, b3) for ts=3 and (a1, b4) for ts=4

... And we can see how the complexity grows rather quickly. In the future it could be nice to add an option for the join processor to emit this complete older version history for the join result, to support use cases which require it, but this volume of updates does not make sense as the default. Adding such an option is deferred to a future KIP.

Proposal

Handling out-of-order records by performing a single timestamped lookup as in Attempt 1 does not guarantee correctness of the older join result, and performing a scan to guarantee correct older version history as in Attempt 2 is expensive. In light of this, I think it's best to simply not produce older join results for now. Specifically, when joining versioned tables, if the new record is not out-of-order, emit the latest join result. Else, emit nothing. The example above becomes:

is incorrect.

Everything was fine up until the last record came. The (a1, b0) join result triggered by the last record is correct, but the (a0, b2) record triggered by the previous record is now incorrect; it should be (a1, b2) instead. In order to properly emit (a1, b2) in addition to (a1, b0) when then a1 record arrives, we have to perform a scan for versions to join with in the B store, instead of performing a single timestamped lookup.

Attempt 2

Suppose we want to maintain that older join results are always accurate. As noted above, this requires performing a scan for old record versions in the versioned store. This certainly works, but it quickly becomes complicated and the number of join results that need to be emitted grows quickly as well.

Code Block
A: 
Code Block
A: (timestamp = 0, value = a0)
A: (timestamp = 5, value = a5)
B: (timestamp = 2, value = b2) -> latest record on B side, emitsemits two join results: (a0, b2) for ts=2, and (a5, b2) withfor timestampts=5
B: (timestamp = 3, value = b3) -> latestemits record on B side, emitstwo join results: (a0, b3) for ts=3, and (a5, b3) withfor timestampts=5
B: (timestamp = 4, value = b4) -> latest record on B side, emits emits two join results: (a0, b4) for ts=4, and (a5, b4) withfor timestampts=5
A: (timestamp = 1, value = a1) -> out-of-order record on A side, no new join result emitted

A consequence of this is that the join result timestamps are once again guaranteed to be non-decreasing. The latest-by-timestamp join result is also the latest-by-offset result, which makes downstream consumption of the join results table suitable even for consumers which don't already handle versioning. 

Note that it's still possible there will be older (by timestamp and by offset) join results in the join results topic, and these results are not guaranteed to be correct. The correctness guarantee only extends to the latest (by timestamp and offset) result.

emits three join results: (a1, b2) for ts=2 and (a1, b3) for ts=3 and (a1, b4) for ts=4

... And we can see how the complexity grows rather quickly. In the future it could be nice to add an option for the join processor to emit this complete older version history for the join result, to support use cases which require it, but this volume of updates does not make sense as the default. Adding such an option is deferred to a future KIP.

Proposal

Handling out-of-order records by performing a single timestamped lookup as in Attempt 1 does not guarantee correctness of the older join result, and performing a scan to guarantee correct older version history as in Attempt 2 is expensive. In light of this, I think it's best to simply not produce older join results for now. Specifically, when joining versioned tables, if the new record is not out-of-order, emit the latest join result. Else, emit nothing. The example above becomes:

Code Block
A: (timestamp = 0, value = a0)
A: (timestamp = 5, value = a5)
B: (timestamp = 2, value = b2) -> latest record on B side, emits (a0a5, b2) with timestamp=25
AB: (timestamp = 53, value = a5b3) -> latest record on AB side, emits (a5, b2b3) with timestamp=5
AB: (timestamp = 1, value = a1) -> out-of-order record on A side, no new join result emitted. the older (a0, b2) join result is no longer correct.

For users that require that all older join results are correct, we will have to introduce an option in the future for implementing Attempt 2 above.

The examples above discussed primary-key table-table joins, but the same reasoning and analysis extends to foreign-key joins, and this KIP proposes to make the analogous updates to foreign-key joins as well. 

 = 4, value = b4) -> latest record on B side, emits (a5, b4) with timestamp=5
A: (timestamp = 1, value = a1) -> out-of-order record on A side, no new join result emitted

A consequence of this is that the join result timestamps are once again guaranteed to be non-decreasing. The latest-by-timestamp join result is also the latest-by-offset result, which makes downstream consumption of the join results table suitable even for consumers which don't already handle versioning. 

Note that it's still possible there will be older (by timestamp and by offset) join results in the join results topic, and these results are not guaranteed to be correct. The correctness guarantee only extends to the latest (by timestamp and offset) result.

Code Block
A: (timestamp = 0, value = a0)
B: (timestamp = 2, value = b2) -> latest record on B side, emits (a0, b2) with timestamp=2
A: (timestamp = 5, value = a5) -> latest record on A side, emits (a5, b2) with timestamp=5
A: (timestamp = 1, value = a1) -> out-of-order record on A side, no new join result emitted. the older (a0, b2) join result is no longer correct.

For users that require that all older join results are correct, we will have to introduce an option in the future for implementing Attempt 2 above.

The examples above discussed primary-key table-table joins, but the same reasoning and analysis extends to foreign-key joins, and this KIP proposes to make the analogous updates to foreign-key joins as well. 

In the case of a table-table join where one side is versioned and the other is not, out-of-order records from the versioned side will not trigger a new join result, but out-of-order records from the unversioned side will. This is inline with the understanding that a versioned table follows timestamp-based semantics, while an unversioned table follows offset-based semantics. The join result guarantees that the latest-by-offset result is the join of the latest-by-timestamp record from the versioned side with the latest-by-offset record from the unversioned side, so it is valid to interpret the join result as an unversioned table. It is not recommended to interpret the join result of a join with mixed versioning as a versioned table, since there are no guarantees about the latest-by-timestamp result in the presence of out-of-order records from the unversioned source table. 

Finally, it's worth nothing that even when joining versioned tables, the table-table join processors will only ever call get(key) and not get(key, timestamp) . The table history retentions still have an indirect implication for this use case, though, since history retention doubles as grace period for the store today (subject to change in the future). Because grace period is per store instance, which has task-level granularity, that means if grace period is set too low then the latest record for one key could be dropped from the store if another key has already advanced the store's observed stream time past the grace period by the time that this record is seen. In light of this, users should still set history retention high enough to capture records from different keys which may arrive out-of-order, even though get(key, timestamp) may never be calledIn the case of a table-table join where one side is versioned and the other is not, out-of-order records from the versioned side will not trigger a new join result, but out-of-order records from the unversioned side will. This is inline with the understanding that a versioned table follows timestamp-based semantics, while an unversioned table follows offset-based semantics. The join result guarantees that the latest-by-offset result is the join of the latest-by-timestamp record from the versioned side with the latest-by-offset record from the unversioned side, so it is valid to interpret the join result as an unversioned table. It is not recommended to interpret the join result of a join with mixed versioning as a versioned table, since there are no guarantees about the latest-by-timestamp result in the presence of out-of-order records from the unversioned source table

Other Processors and the Long-Term Vision

...