Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Document eviction semantics

...

No Format
table
  .suppress(untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000)))
  .toStream(); // etc.

Buffer Eviction Behavior (aka Suppress Emit Behavior)

I propose to offer the following constraints:

  1. at all times, the buffer enforces the #keys bound
  2. at all times, the buffer enforces the size (#bytes) bound
  3. at all times, the buffer enforces the time bound

Inserting a new record may violate any or all of the constraints, and advancing stream time may violate constraint 3.

If any of the constraints are violated, the buffer will evict (and emit) records until all constraints are again satisfied.

In evicting (and emitting) records, the buffer will use the following eviction strategy:

  • the oldest record (by timestamp) in the buffer gets evicted

In updating a record whose key is always in the buffer, offset order is respected.


This can be visualized as the following algorithm:

No Format
process(key, value, timestamp):
  buffer.insertOrUpdate(key -> (value, timestamp))
  while(buffer.constraintsViolated()) {
    (key -> (value, timestamp)) := buffer.evictOldest()
    emit(key, value, timestamp)
  }


This spec will maintain offset ordering for all updates to a key, but may re-order events between keys.

Here are some examples to illustrate the dynamics:

Offset order is maintained when a key is updated, regardless of timestamp:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1emitnotes
0Ax0(A,x,0)------
1Ay1(A,x,1)------Subsequent update to A overwrites the prior value


offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1emitnotes
0Ax1(A,x,1)------
1Aw0(A,w,0)------Subsequent update to A overwrites the prior value (even though this is an earlier event by time)


The behavior is straightforward with no late events

Enforcing a key limit of 2:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1emitnotes
0Aw0(A,w,0)------
1Ax1(A,x,1)------
2By2(A,x,1)(B,y,2)---
3Cz3(B,y,2)(C,z,3)(A,x,1)A is the oldest when we violate the key constraint


Enforcing a byte limit of 3 (each character is one byte)

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Axx0(A,xx,0)---------
1Ayy1(A,yy,1)---------
2Bzz2(B,zz,2)------(A,yy,1)A is the oldest entry when the size constraint is violated, so we emit it.


Enforcing "emit after 2ms":

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Aw0(A,w,0)---------
1Ax1(A,x,1)---------
2By2(A,x,1)(B,y,2)------
3Cz3(B,y,2)(C,z,3)---(A,x,1)The stream time is now 3, so we emit all the records up to time 1 (this is just A)


Note: newly added late events can be immediately evicted. 

Enforcing "emit after 2ms":

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Aw3(A,w,3)---------
1Ax1---------(A,x,1)The stream time is 3, and the timestamp of A is now 1, so we have to emit it.
2By1---------(B,y,1)The stream time is 3, and the timestamp of B is 1, so we have to emit it.


Likewise with a key constraint of 2:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1emitnotes
0Aw0(A,w,0)------
1Ax1(A,x,1)------
2By2(A,x,1)(B,y,2)---
3Cz0(A,x,1)(B,y,2)(C,z,0)Even though it is the most recently added, C is still the oldest event when the key constraint is violated


And of course with a size constraint of 3 bytes as well:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Axx0(A,xx,0)---------
1Ayy1(A,yy,1)---------
2Bzz0(A,yy,1)------(B,zz,0)Even though B is the most recently added event, it is still the oldest one by timestamp when the size constraint is violated


Big records can push multiple events out of the buffer

Size constraint of 3 bytes:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Ax0(A,x,0)---------
1By1(A,x,0)(B,y,1)------
2Czzz2(C,zzz,2)------(A,x,0),(B,y,1)No other records can fit in the buffer with C, and A and B are both older than C

In fact, events can be so big they don't fit in the buffer at all.

Still 3 bytes:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Ax0(A,x,0)---------
1By1(A,x,0)(B,y,1)------
2Czzzz2---------(A,x,0),(B,y,1),(C,zzzz,2)A and B are both older than C, so they must be emitted before C, and C itself doesn’t fit in the buffer, so it must then be immediately emitted.



Rejected alternative: evicting by offset instead of timestamp.

This causes strange behavior when there is a time constraint involved:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
1Ax2(A,x,2)---------
2By1(A,x,2)(B,y,1)------
3Cz3(A,x,2)(B,y,1)(C,z,3)---Even though we B is old enough to emit, it’s not at the head of the queue, so we can’t emit it
4Czz4(C,zz,4)------(A,x,2),(B,y,1)It’s now time to emit A, and once we do, it’s no longer blocking B, so we can emit it as well.

Rejected alternative: evicting by timestamp only when the buffer is time constrained, and using offset order otherwise

Having just one eviction strategy simplifies everything: documentation, explanations, the code, the testing, etc.

I'd also argue that neither strategy is any more or less surprising for key- or size-constrained buffers. All the "flushing" behaviors above are also present in offset-ordered eviction.

Time-based eviction is perfectly legal, as we are only required to maintain a partial order over the partition (order only needs to be maintained within each key). Such partial reordering happens when we repartition anyway.

Rejected alternative (maybe a future option): "reset the timeout" when updating a key:

The current behavior is that, if you specify some timeout, say 5 minutes, any record that gets buffered is guaranteed to be emitted at the 5 minute mark, regardless of how often it's updated.  Aka, it is emitted 5 minutes after the first update.

An alternative is to effectively reset the timer on each update, so the record would be emitted 5 minutes after the last update. This behavior is analogous to session windowing; it allows suppression to turn a high-frequency "run" up updates to a record into a single emitted record.

This was rejected in the current API simply because it may suppress a record forever, if that record is updated with a frequency higher than the suppression timeout.

For the default behavior, it seemed better to go with the one that is guaranteed to emit after the specified timeout.

But this could easily be added as an alternative in the future, if there is demand for it.

Compatibility, Deprecation, and Migration Plan

...