Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

No Format
public interface Suppressed<K> {

/**
 * Marker interface for a buffer configuration that is "strict" in the sense that it will strictly
 * enforce the time bound and never emit early.
 */
interface StrictBufferConfig extends BufferConfig<StrictBufferConfig> {}

/**
 * Marker interface for a buffer configuration that will strictly enforce size constraints
 * (bytes and/or number of records) on the buffer, so it is suitable for reducing duplicate
 * results downstream, but does not promise to eliminate them entirely.
 */
interface EagerBufferConfig extends BufferConfig<EagerBufferConfig> {}

interface BufferConfig<BC extends BufferConfig<BC>> {

        /**
         * Create a size-constrained buffer in terms of the maximum number of keys it will store.
         */
        static BufferConfig<?>EagerBufferConfig maxRecords(final long recordLimit);

        /**
         * Set a size constraint on the buffer in terms of the maximum number of keys it will store.
         */
        BC withMaxRecords(final long recordLimit);

        /**
         * Create a size-constrained buffer in terms of the maximum number of bytes it will use.
         */
        static BufferConfig<?>EagerBufferConfig maxBytes(final long byteLimit);

        /**
         * Set a size constraint on the buffer, the maximum number of bytes it will use.
         */
        BC withMaxBytes(final long byteLimit);

        /**
         * Create a buffer unconstrained by size (either keys or bytes).
         *
         * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
         *
         * If there isn't enough heap available to meet the demand, the application will encounter an
         * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
         * JVM processes under extreme memory pressure may exhibit poor GC behavior.
         *
         * This is a convenient option if you doubt that your buffer will be that large, but also don't
         * wish to pick particular constraints, such as in testing.
         *
         * This buffer is "strict" in the sense that it will enforce the time bound or crash.
         * It will never emit early.
         */
        static StrictBufferConfig unbounded();

        /**
         * Set the buffer to be unconstrained by size (either keys or bytes).
         *
         * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
         *
         * If there isn't enough heap available to meet the demand, the application will encounter an
         * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
         * JVM processes under extreme memory pressure may exhibit poor GC behavior.
         *
         * This is a convenient option if you doubt that your buffer will be that large, but also don't
         * wish to pick particular constraints, such as in testing.
         *
         * This buffer is "strict" in the sense that it will enforce the time bound or crash.
         * It will never emit early.
         */
        StrictBufferConfig withNoBound();

        /**
         * Set the buffer to gracefully shut down the application when any of its constraints are violated
         *
         * This buffer is "strict" in the sense that it will enforce the time bound or shut down.
         * It will never emit early.
         */
        StrictBufferConfig shutDownWhenFull();

        /**
         * Sets the buffer to use on-disk storage if it requires more memory than the constraints allow.
         *
         * This buffer is "strict" in the sense that it will never emit early.
         */
        StrictBufferConfig spillToDiskWhenFull();

        /**
         * Set the buffer to just emit the oldest records when any of its constraints are violated.
         *
         * This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing
         * duplicate results downstream, but does not promise to eliminate them.
         */
        BufferConfigEagerBufferConfig emitEarlyWhenFull();
}

/**
 * Configure the suppression to emit only the "final results" from the window.
 *
 * By default all Streams operators emit results whenever new results are available.
 * This includes windowed operations.
 *
 * This configuration will instead emit just one result per key for each window, guaranteeing
 * to deliver only the final result. This option is suitable for use cases in which the business logic
 * requires a hard guarantee that only the final result is propagated. For example, sending alerts.
 *
 * To accomplish this, the operator will buffer events from the window until the window close (that is,
 * until the end-time passes, and additionally until the grace period expires). Since windowed operators
 * are required to reject late events for a window whose grace period is expired, there is an additional
 * guarantee that the final results emitted from this suppression are eventually consistent with the upstream
 * operator and its queriable state, if enabled.
 *
 * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
 * This is required to be a "strict" config, since it would violate the "final results"
 * property to emit early and then issue an update later.
 * @param <K> The key type for the KTable to apply this suppression to. "Final results" mode is only available
 * on Windowed KTables (this is enforced by the type parameter).
 * @return a "final results" mode suppression configuration
 */
static Suppressed<Windowed> untilWindowCloses(final StrictBufferConfig bufferConfig);

/**
 * Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of time after receiving a record
 * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
 * the first record in the buffer but does <em>not</em> re-start the timer.
 *
 * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
 * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
 * @param <K> The key type for the KTable to apply this suppression to.
 * @return a suppression configuration
 */
 static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig);


/**
 * Use the specified name for the suppression node in the topology.
 * <p>
 * This can be used to insert a suppression without changing the rest of the topology names
 * (and therefore not requiring an application reset).
 * <p>
 * Note however, that once a suppression has buffered some records, removing it from the topology would cause
 * the loss of those records.
 * <p>
 * A suppression can be "disabled" with the configuration {@code untilTimeLimit(Duration.ZERO, ...}.
 *
 * @param name The name to be used for the suppression node and changelog topic
 * @return The same configuration with the addition of the given {@code name}.
 */
 Suppressed<K> withName(final String name);
}

...

Offset order is maintained when a key is updated, regardless of timestamp:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1emitnotes
0Ax0(A,x,0)------
1Ay1(A,x,1)------Subsequent update to A overwrites the prior value


offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1emitnotes
0Ax1(A,x,1)------
1Aw0(A,w,0)------Subsequent update to A overwrites the prior value (even though this is an earlier event by time)


The behavior is straightforward with no late events

Enforcing a key limit of 2:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1emitnotes
0Aw0(A,w,0)------
1Ax1(A,x,1)------
2By2(A,x,1)(B,y,2)---
3Cz3(B,y,2)(C,z,3)(A,x,1)A is the oldest when we violate the key constraint


Enforcing a byte limit of 3 (each character is one byte)

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Axx0(A,xx,0)---------
1Ayy1(A,yy,1)---------
2Bzz2(B,zz,2)------(A,yy,1)A is the oldest entry when the size constraint is violated, so we emit it.


Enforcing "emit after 2ms":

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Aw0(A,w,0)---------
1Ax1(A,x,1)---------
2By2(A,x,1)(B,y,2)------
3Cz3(B,y,2)(C,z,3)---(A,x,1)The stream time is now 3, so we emit all the records up to time 1 (this is just A)


Note: newly added late events can be immediately evicted. 

Enforcing "emit after 2ms":

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Aw3(A,w,3)---------
1Ax1---------(A,x,1)The stream time is 3, and the timestamp of A is now 1, so we have to emit it.
2By1---------(B,y,1)The stream time is 3, and the timestamp of B is 1, so we have to emit it.


Likewise with a key constraint of 2:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1emitnotes
0Aw0(A,w,0)------
1Ax1(A,x,1)------
2By2(A,x,1)(B,y,2)---
3Cz0(A,x,1)(B,y,2)(C,z,0)Even though it is the most recently added, C is still the oldest event when the key constraint is violated


And of course with a size constraint of 3 bytes as well:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Axx0(A,xx,0)---------
1Ayy1(A,yy,1)---------
2Bzz0(A,yy,1)------(B,zz,0)Even though B is the most recently added event, it is still the oldest one by timestamp when the size constraint is violated


Big records can push multiple events out of the buffer

Size constraint of 3 bytes:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Ax0(A,x,0)---------
1By1(A,x,0)(B,y,1)------
2Czzz2(C,zzz,2)------(A,x,0),(B,y,1)No other records can fit in the buffer with C, and A and B are both older than C

In fact, events can be so big they don't fit in the buffer at all.

Still 3 bytes:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
0Ax0(A,x,0)---------
1By1(A,x,0)(B,y,1)------
2Czzzz2---------(A,x,0),(B,y,1),(C,zzzz,2)A and B are both older than C, so they must be emitted before C, and C itself doesn’t fit in the buffer, so it must then be immediately emitted.



Rejected alternative: evicting by offset instead of timestamp.

This causes strange behavior when there is a time constraint involved:

offsetkeyvaluetimestampbuffer-slot-0buffer-slot-1buffer-slot-2emitnotes
1Ax2(A,x,2)---------
2By1(A,x,2)(B,y,1)------
3Cz3(A,x,2)(B,y,1)(C,z,3)---Even though we B is old enough to emit, it’s not at the head of the queue, so we can’t emit it
4Czz4(C,zz,4)------(A,x,2),(B,y,1)It’s now time to emit A, and once we do, it’s no longer blocking B, so we can emit it as well.

Rejected alternative: evicting by timestamp only when the buffer is time constrained, and using offset order otherwise

...