...
No Format |
---|
table .suppress(untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000))) .toStream(); // etc. |
Buffer Eviction Behavior (aka Suppress Emit Behavior)
I propose to offer the following constraints:
- at all times, the buffer enforces the #keys bound
- at all times, the buffer enforces the size (#bytes) bound
- at all times, the buffer enforces the time bound
Inserting a new record may violate any or all of the constraints, and advancing stream time may violate constraint 3.
If any of the constraints are violated, the buffer will evict (and emit) records until all constraints are again satisfied.
In evicting (and emitting) records, the buffer will use the following eviction strategy:
- the oldest record (by timestamp) in the buffer gets evicted
In updating a record whose key is always in the buffer, offset order is respected.
This can be visualized as the following algorithm:
No Format |
---|
process(key, value, timestamp):
buffer.insertOrUpdate(key -> (value, timestamp))
while(buffer.constraintsViolated()) {
(key -> (value, timestamp)) := buffer.evictOldest()
emit(key, value, timestamp)
} |
This spec will maintain offset ordering for all updates to a key, but may re-order events between keys.
Here are some examples to illustrate the dynamics:
Offset order is maintained when a key is updated, regardless of timestamp:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | emit | notes |
---|---|---|---|---|---|---|---|
0 | A | x | 0 | (A,x,0) | --- | --- | |
1 | A | y | 1 | (A,x,1) | --- | --- | Subsequent update to A overwrites the prior value |
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | emit | notes |
---|---|---|---|---|---|---|---|
0 | A | x | 1 | (A,x,1) | --- | --- | |
1 | A | w | 0 | (A,w,0) | --- | --- | Subsequent update to A overwrites the prior value (even though this is an earlier event by time) |
The behavior is straightforward with no late events
Enforcing a key limit of 2:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | emit | notes |
---|---|---|---|---|---|---|---|
0 | A | w | 0 | (A,w,0) | --- | --- | |
1 | A | x | 1 | (A,x,1) | --- | --- | |
2 | B | y | 2 | (A,x,1) | (B,y,2) | --- | |
3 | C | z | 3 | (B,y,2) | (C,z,3) | (A,x,1) | A is the oldest when we violate the key constraint |
Enforcing a byte limit of 3 (each character is one byte)
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | xx | 0 | (A,xx,0) | --- | --- | --- | |
1 | A | yy | 1 | (A,yy,1) | --- | --- | --- | |
2 | B | zz | 2 | (B,zz,2) | --- | --- | (A,yy,1) | A is the oldest entry when the size constraint is violated, so we emit it. |
Enforcing "emit after 2ms":
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | w | 0 | (A,w,0) | --- | --- | --- | |
1 | A | x | 1 | (A,x,1) | --- | --- | --- | |
2 | B | y | 2 | (A,x,1) | (B,y,2) | --- | --- | |
3 | C | z | 3 | (B,y,2) | (C,z,3) | --- | (A,x,1) | The stream time is now 3, so we emit all the records up to time 1 (this is just A) |
Note: newly added late events can be immediately evicted.
Enforcing "emit after 2ms":
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | w | 3 | (A,w,3) | --- | --- | --- | |
1 | A | x | 1 | --- | --- | --- | (A,x,1) | The stream time is 3, and the timestamp of A is now 1, so we have to emit it. |
2 | B | y | 1 | --- | --- | --- | (B,y,1) | The stream time is 3, and the timestamp of B is 1, so we have to emit it. |
Likewise with a key constraint of 2:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | emit | notes |
---|---|---|---|---|---|---|---|
0 | A | w | 0 | (A,w,0) | --- | --- | |
1 | A | x | 1 | (A,x,1) | --- | --- | |
2 | B | y | 2 | (A,x,1) | (B,y,2) | --- | |
3 | C | z | 0 | (A,x,1) | (B,y,2) | (C,z,0) | Even though it is the most recently added, C is still the oldest event when the key constraint is violated |
And of course with a size constraint of 3 bytes as well:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | xx | 0 | (A,xx,0) | --- | --- | --- | |
1 | A | yy | 1 | (A,yy,1) | --- | --- | --- | |
2 | B | zz | 0 | (A,yy,1) | --- | --- | (B,zz,0) | Even though B is the most recently added event, it is still the oldest one by timestamp when the size constraint is violated |
Big records can push multiple events out of the buffer
Size constraint of 3 bytes:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | x | 0 | (A,x,0) | --- | --- | --- | |
1 | B | y | 1 | (A,x,0) | (B,y,1) | --- | --- | |
2 | C | zzz | 2 | (C,zzz,2) | --- | --- | (A,x,0),(B,y,1) | No other records can fit in the buffer with C, and A and B are both older than C |
In fact, events can be so big they don't fit in the buffer at all.
Still 3 bytes:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
0 | A | x | 0 | (A,x,0) | --- | --- | --- | |
1 | B | y | 1 | (A,x,0) | (B,y,1) | --- | --- | |
2 | C | zzzz | 2 | --- | --- | --- | (A,x,0),(B,y,1),(C,zzzz,2) | A and B are both older than C, so they must be emitted before C, and C itself doesn’t fit in the buffer, so it must then be immediately emitted. |
Rejected alternative: evicting by offset instead of timestamp.
This causes strange behavior when there is a time constraint involved:
offset | key | value | timestamp | buffer-slot-0 | buffer-slot-1 | buffer-slot-2 | emit | notes |
---|---|---|---|---|---|---|---|---|
1 | A | x | 2 | (A,x,2) | --- | --- | --- | |
2 | B | y | 1 | (A,x,2) | (B,y,1) | --- | --- | |
3 | C | z | 3 | (A,x,2) | (B,y,1) | (C,z,3) | --- | Even though we B is old enough to emit, it’s not at the head of the queue, so we can’t emit it |
4 | C | zz | 4 | (C,zz,4) | --- | --- | (A,x,2),(B,y,1) | It’s now time to emit A, and once we do, it’s no longer blocking B, so we can emit it as well. |
Rejected alternative: evicting by timestamp only when the buffer is time constrained, and using offset order otherwise
Having just one eviction strategy simplifies everything: documentation, explanations, the code, the testing, etc.
I'd also argue that neither strategy is any more or less surprising for key- or size-constrained buffers. All the "flushing" behaviors above are also present in offset-ordered eviction.
Time-based eviction is perfectly legal, as we are only required to maintain a partial order over the partition (order only needs to be maintained within each key). Such partial reordering happens when we repartition anyway.
Rejected alternative (maybe a future option): "reset the timeout" when updating a key:
The current behavior is that, if you specify some timeout, say 5 minutes, any record that gets buffered is guaranteed to be emitted at the 5 minute mark, regardless of how often it's updated. Aka, it is emitted 5 minutes after the first update.
An alternative is to effectively reset the timer on each update, so the record would be emitted 5 minutes after the last update. This behavior is analogous to session windowing; it allows suppression to turn a high-frequency "run" up updates to a record into a single emitted record.
This was rejected in the current API simply because it may suppress a record forever, if that record is updated with a frequency higher than the suppression timeout.
For the default behavior, it seemed better to go with the one that is guaranteed to emit after the specified timeout.
But this could easily be added as an alternative in the future, if there is demand for it.
Compatibility, Deprecation, and Migration Plan
...