Windowed computations in particular present a unique opportunity. The output of a windowed computation is a KTable in which the keys are annotated with window ids. Because the window has a defined end, we know that the keys belonging to that window can never be modified again once that end time has passed, except by lateout-of-arriving order events. If we bound the lateness "maximum allowed delay" of events, we can know the "final value" of the window. This is especially useful for taking irrevocable actions in response to window results, such as sending an email or alert when a windowed aggregation is below a certain threshold.


  1. All but the final result for windows. You can use suppress get exactly one final result per window/key for windowed computations. This includes both time and session windows.
    1. This feature requires adding a "grace period" parameter for windows.
  2. Intermediate Events. You can use suppress to wait for more updates on the same key before emitting them downstream. There are two main configuration knobs to tune here:
    1. How long to wait for more updates before emitting. This is an amount of time, measured either from the event time (for regular KTables) or from the window end start (for windowed KTables), to buffer up each key before emitting them downstream.
    2. How much memory to use for buffering events (and what to do when it fills up). Suppress needs to remember the last observed event for each key, which takes memory. You can configure the amount of memory either by number of keys or by raw heap size. You can configure what to do when the buffer runs out of memory:
      1. Emit when full. This option is basically "best effort" suppression. If suppress runs out of memory, it will emit the oldest event.
      2. Spill to disk. This option is for when you need suppression to be strict. If the operator runs out of memory, it will allocate a RocksDB database and begin storing new records there. This may be complicated to implement, and consequently might be a future addition.
      3. Shut down. There is an option to run with the assumption that your "live set" of suppression events will fit within memory. If this assumption turns out to be false, the application will run out of memory and shut down. This may sound strange, but it's no different than using an InMemoryKeyValueStore or even an ArrayList. In the case of suppress, you can either leave the memory buffer unconstrained, which will cause an application crash if it runs out of memory, or you can configure it with a memory constraint and tell it to shut down gracefully when it runs out. Either one is safe from a correctness standpoint, but the latter will result in faster recovery. 


To resolve this conflict, we're adding a new concept to the window spec: grace period. This is an amount of time that the window should accept lateout-of-arriving order events after the window ends. After the grace period passes, the window is considered "closed" and will never be updated again. The grace period places a lower-bound constraint on the retention time, but otherwise has no implication on retention.


No Format
public interface Suppressed<K> {

 * Marker interface for a buffer configuration that is "strict" in the sense that it will strictly
 * enforce the time bound and never emit early.
interface StrictBufferConfig extends BufferConfig<StrictBufferConfig> {}

 * Marker interface for a buffer configuration that will strictly enforce size constraints
 * (bytes and/or number of records) on the buffer, so it is suitable for reducing duplicate
 * results downstream, but does not promise to eliminate them entirely.
interface EagerBufferConfig extends BufferConfig<EagerBufferConfig> {}

interface BufferConfig<BC extends BufferConfig<BC>> {

         * Create a size-constrained buffer in terms of the maximum number of keys it will store.
        static BufferConfig<?>EagerBufferConfig maxRecords(final long recordLimit);

         * Set a size constraint on the buffer in terms of the maximum number of keys it will store.
        BC withMaxRecords(final long recordLimit);

         * Create a size-constrained buffer in terms of the maximum number of bytes it will use.
        static BufferConfig<?>EagerBufferConfig maxBytes(final long byteLimit);

         * Set a size constraint on the buffer, the maximum number of bytes it will use.
        BC withMaxBytes(final long byteLimit);

         * Create a buffer unconstrained by size (either keys or bytes).
         * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
         * If there isn't enough heap available to meet the demand, the application will encounter an
         * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
         * JVM processes under extreme memory pressure may exhibit poor GC behavior.
         * This is a convenient option if you doubt that your buffer will be that large, but also don't
         * wish to pick particular constraints, such as in testing.
         * This buffer is "strict" in the sense that it will enforce the time bound or crash.
         * It will never emit early.
        static StrictBufferConfig unbounded();

         * Set the buffer to be unconstrained by size (either keys or bytes).
         * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
         * If there isn't enough heap available to meet the demand, the application will encounter an
         * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
         * JVM processes under extreme memory pressure may exhibit poor GC behavior.
         * This is a convenient option if you doubt that your buffer will be that large, but also don't
         * wish to pick particular constraints, such as in testing.
         * This buffer is "strict" in the sense that it will enforce the time bound or crash.
         * It will never emit early.
        StrictBufferConfig withNoBound();

         * Set the buffer to gracefully shut down the application when any of its constraints are violated
         * This buffer is "strict" in the sense that it will enforce the time bound or shut down.
         * It will never emit early.
        StrictBufferConfig shutDownWhenFull();

         * Sets the buffer to use on-disk storage if it requires more memory than the constraints allow.
         * This buffer is "strict" in the sense that it will never emit early.
        StrictBufferConfig spillToDiskWhenFull();

         * Set the buffer to just emit the oldest records when any of its constraints are violated.
         * This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing
         * duplicate results downstream, but does not promise to eliminate them.
        BufferConfigEagerBufferConfig emitEarlyWhenFull();

 * Configure the suppression to emit only the "final results" from the window.
 * By default all Streams operators emit results whenever new results are available.
 * This includes windowed operations.
 * This configuration will instead emit just one result per key for each window, guaranteeing
 * to deliver only the final result. This option is suitable for use cases in which the business logic
 * requires a hard guarantee that only the final result is propagated. For example, sending alerts.
 * To accomplish this, the operator will buffer events from the window until the window close (that is,
 * until the end-time passes, and additionally until the grace period expires). Since windowed operators
 * are required to reject late events for a window whose grace period is expired, there is an additional
 * guarantee that the final results emitted from this suppression are eventually consistent with the upstream
 * operator and its queriable state, if enabled.
 * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
 * This is required to be a "strict" config, since it would violate the "final results"
 * property to emit early and then issue an update later.
 * @param <K> The key type for the KTable to apply this suppression to. "Final results" mode is only available
 * on Windowed KTables (this is enforced by the type parameter).
 * @return a "final results" mode suppression configuration
static Suppressed<Windowed> untilWindowCloses(final StrictBufferConfig bufferConfig);

 * Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of time after receiving a record
 * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
 * the first record in the buffer but does <em>not</em> re-start the timer.
 * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
 * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
 * @param <K> The key type for the KTable to apply this suppression to.
 * @return a suppression configuration
 static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig);

 * Use the specified name for the suppression node in the topology.
 * <p>
 * This can be used to insert a suppression without changing the rest of the topology names
 * (and therefore not requiring an application reset).
 * <p>
 * Note however, that once a suppression has buffered some records, removing it from the topology would cause
 * the loss of those records.
 * <p>
 * A suppression can be "disabled" with the configuration {@code untilTimeLimit(Duration.ZERO, ...}.
 * @param name The name to be used for the suppression node and changelog topic
 * @return The same configuration with the addition of the given {@code name}.
 Suppressed<K> withName(final String name);


Offset order is maintained when a key is updated, regardless of timestamp:

1Ay1(A,x,1)------Subsequent update to A overwrites the prior value

1Aw0(A,w,0)------Subsequent update to A overwrites the prior value (even though this is an earlier event by time)

The behavior is straightforward with no late events

Enforcing a key limit of 2:

3Cz3(B,y,2)(C,z,3)(A,x,1)A is the oldest when we violate the key constraint

Enforcing a byte limit of 3 (each character is one byte)

2Bzz2(B,zz,2)------(A,yy,1)A is the oldest entry when the size constraint is violated, so we emit it.

Enforcing "emit after 2ms":

3Cz3(B,y,2)(C,z,3)---(A,x,1)The stream time is now 3, so we emit all the records up to time 1 (this is just A)

Note: newly added late events can be immediately evicted. 

Enforcing "emit after 2ms":

1Ax1---------(A,x,1)The stream time is 3, and the timestamp of A is now 1, so we have to emit it.
2By1---------(B,y,1)The stream time is 3, and the timestamp of B is 1, so we have to emit it.

Likewise with a key constraint of 2:

3Cz0(A,x,1)(B,y,2)(C,z,0)Even though it is the most recently added, C is still the oldest event when the key constraint is violated

And of course with a size constraint of 3 bytes as well:

2Bzz0(A,yy,1)------(B,zz,0)Even though B is the most recently added event, it is still the oldest one by timestamp when the size constraint is violated

Big records can push multiple events out of the buffer

Size constraint of 3 bytes:

2Czzz2(C,zzz,2)------(A,x,0),(B,y,1)No other records can fit in the buffer with C, and A and B are both older than C

In fact, events can be so big they don't fit in the buffer at all.

Still 3 bytes:

2Czzzz2---------(A,x,0),(B,y,1),(C,zzzz,2)A and B are both older than C, so they must be emitted before C, and C itself doesn’t fit in the buffer, so it must then be immediately emitted.

Rejected alternative: evicting by offset instead of timestamp.

This causes strange behavior when there is a time constraint involved:

3Cz3(A,x,2)(B,y,1)(C,z,3)---Even though we B is old enough to emit, it’s not at the head of the queue, so we can’t emit it
4Czz4(C,zz,4)------(A,x,2),(B,y,1)It’s now time to emit A, and once we do, it’s no longer blocking B, so we can emit it as well.

Rejected alternative: evicting by timestamp only when the buffer is time constrained, and using offset order otherwise
