Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The Stream Table join is inflexible how it handles out of order data in its current state. We recently added versioned tables which allow the table side of the a join to be processed in a timestamp aware method, but it is not taken advantage of in joins. Right now we only have the option to process the stream side in the offset order as the records arrive. However a record that is a better fit might show up in the table later. This semantic gap leads to incorrect output in some cases as the stream can only process data as it comes in and then is joined with whatever is the latest version in the table.

If the table side uses a materialized version store, it can store multiple versions of each record within its defined grace table history retention period. This proposal would bring the stream side into alignment with that flexibility. By adding a grace period buffer to the stream side it can wait until it is certain that the record in the table side is the right one for its timestamp.

...

Say we have a versioned table like this and a grace period table history retention time of 10:

KeyValue at timestamp 1val@TS2val@TS3
1aaa
2bxx
3ccy

...

Without buffering the stream it could look like this:

KeyJoin result
1da
2ex3fy
2gx
3hy

or this is also a valid result:

KeyJoin result
1da
2eb
3fc
2gb
3hc

Public Interfaces

...

To allow users to configure the buffertime we will add two new apis. The grace period will be set on the Joined object using a duration. This Joined object is currently only used for the stream table join and will be optional to set. The grace period will only affect the stream buffer and the table grace period, if sethistory retention, will remain unchanged.

If a grace period is not set the join will execute as before, using the same logic in the stream table join node. If a grace period of zero is set the join will drop all out of order records from the stream side operations. If the grace period is non zero, the record will enter a stream buffer and will dequeue when the record timestamp is greater than stream time plus the grace period.

...