Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • A grace period on the window. This would be a balance of how late you expect events to be and how long you're willing to wait before finalizing the window and emitting the result.
  • Suppress with "emitFinalResultsOnlyuntilWindowClose". There's a specific "SuppressSuppressed" configuration for this feature. It allows you to configure the memory buffer and decide whether to shut down or switch to disk.

...

  • Emit-when-full memory strategy
  • The Suppress with "untilWindowClose": the inverse of the desired rate. If the desired rate is R (events per second), the inverse is the buffering time: 1/R (seconds per event).

...

  • Unlimited memory (either with spill-to-disk or just not limiting the memory)
  • The Suppress with "untilWindowClose": the inverse of the desired rate. If the desired rate is R (events per second), the inverse is the buffering time: 1/R (seconds per event).

...

No Format
public interface Suppressed<K, V>Suppressed<K> {

    interface TimeDefinition<K, V> {
        long time(ProcessorContext context, K k, V v);
    }

    enum BufferFullStrategy {
        EMIT,/**
 * Marker interface for a buffer configuration that is "strict" in the sense that it will strictly
 * enforce the time bound and never emit early.
 */
 interface StrictBufferConfig extends BufferConfig<StrictBufferConfig> {

    }

    interface BufferConfig<BC extends  SPILL_TO_DISK,BufferConfig<BC>> {
        SHUT_DOWN/**
 * Create  }

    interface BufferConfig<K, V> {
        static <K, V> BufferConfig<K, V> withBufferKeysa size-constrained buffer in terms of the maximum number of keys it will store.
 */
 static BufferConfig<?> maxRecords(final long numberOfKeysToRememberrecordLimit);

        /**
 * Set a size BufferConfig<K,constraint V>on bufferKeys(finalthe long numberOfKeysToRemember);

        static <K, V> BufferConfig<K, V> withBufferBytesbuffer in terms of the maximum number of keys it will store.
 */
 BC withMaxRecords(final long bytesToUseForSuppressionStoragerecordLimit);

        BufferConfig<K, V> bufferBytes(final long bytesToUseForSuppressionStorage);

        static <K, V> BufferConfig<K, V> withBufferFullStrategy(final BufferFullStrategy bufferFullStrategy)/**
 * Create a size-constrained buffer in terms of the maximum number of bytes it will use.
 */
 static BufferConfig<?> maxBytes(final long byteLimit);

        BufferConfig<K, V> bufferFullStrategy(final BufferFullStrategy bufferFullStrategy);
    }

    interface IntermediateSuppression<K, V> {
        static <K, V> IntermediateSuppression<K, V> withEmitAfter/**
 * Set a size constraint on the buffer, the maximum number of bytes it will use.
 */
 BC withMaxBytes(final long millisToWaitForMoreEventsbyteLimit);

        IntermediateSuppression<K, V> emitAfter(final long millisToWaitForMoreEvents);

        static <K, V> IntermediateSuppression<K, V> withBufferConfig(final BufferConfig<K, V> bufferConfig)

        IntermediateSuppression<K, V> bufferConfig(final BufferConfig<K, V> bufferConfig);
    }

    static <K extends Windowed, V> Suppressed<K, V> emitFinalResultsOnly(final BufferConfig<K, V> bufferConfig)

    static <K, V> Suppressed<K, V> intermediateEvents(final IntermediateSuppression<K, V> intermediateSuppression)
}

Metrics

Along with the suppression operator, we will add several metrics. Note that suppress will not add to the skipped-records metrics. "Skipped" records are records that are for one reason or another invalid. "Suppressed" records are intentionally dropped, just like filtered records. Likewise with events arriving later than the grace period for windows.

Note: I'm not proposing roll-up metrics for these. They would be reported at the processor-node level. I suspect this is actually fine, and roll-ups can easily be added later if necessary.

Metrics we'll add:

  • late records (new records older than the grace period) are currently metered as "skipped-records" and logged at WARN level. As noted, this is not correct, so we will change the logs to DEBUG level and add new metrics: 
    • average and max observed lateness of all records: to help configure the grace period
      • (INFO) record-lateness-[avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of dropped events for closed windows
      • (INFO) late-record-drop-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
  • expired records (new records for segments older than the state store's retention period) are currently not metered and logged at DEBUG level. Since the grace period is currently equivalent to the retention period, this should currently be rare, as such events would never reach the state store but be marked as skipped and dropped in the processor. However, since we are deprecating `Windows.until` and defining retention only on the state store, it would become much more common. Therefore, we'll add some new state store metrics:
    • rate and total of events for expired windows
      • (INFO) expired-window-record-drop-[rate | total] type=stream-[storeType]-state-metrics client-id=<threadId> task-id=<taskId> [storeType]-state-id=[storeName]
  • intermediate event suppression
    • current, average, and peak intermediate suppression buffer size
      • (INFO) suppression-mem-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • current, average, and peak intermediate suppression disk buffer size (only present when using the "SPILL_TO_DISK" buffer-full strategy)
      • (INFO) suppression-disk-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • intermediate suppression buffer eviction rate and total: to how often events are emitted early (only present when using the "EMIT" buffer-full strategy)
      • (INFO) suppression-mem-buffer-evict-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of intermediate result suppressions
      • (INFO) intermediate-result-suppression-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • min and average intermediate suppression overtime: to determine whether the intermediate suppression emitAfter is delaying longer than necessary. This metric may be unnecessary, since it's equivalent to (timeToWaitForMoreEventsConfig - observedLatenessMetric).
      • (INFO) intermediate-result-suppression-overtime-[min | avg] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>

Examples

Here are some examples of programs using the new operator to achieve various goals.

Final window results per key

Imagine we wish to send an alert if a user has fewer than 3 events in an hour. For the example, we'll wait up to 10 minutes after the hour ends for late-arriving events.

No Format
builder.<Integer, String>stream("events")
    .groupByKey()
    .windowedBy(TimeWindows.of(3600_000).grace(600_000)
    .count()
    .suppress(emitFinalResultsOnly(withBufferFullStrategy(SHUT_DOWN)))
    .toStream()
    .filter((key, value) -> value < 3)
    .foreach((key, value) -> sendAlert("User " + key.key() + " had fewer than 3 events in window " + key.window()));

Note that we can handle limited memory in a few different ways:

No Format
// Option 1: expect not to run out of memory
windowCounts
  .suppress(emitFinalResultsOnly(withBufferKeys(Long.MAX_VALUE)))


// Option 2: shut down gracefully if we need too much memory
windowCounts
  .suppress(
    emitFinalResultsOnly(
      withBufferBytes(5_000_000)
        .bufferFullStrategy(SHUT_DOWN)
    )
  );


// Option 3: Start using disk if we need too much memory
windowCounts
  .suppress(
    emitFinalResultsOnly(
      withBufferBytes(5_000_000)
        .bufferFullStrategy(SPILL_TO_DISK)
    )
  );
/**
 * Create a buffer unconstrained by size (either keys or bytes).
 *
 * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
 *
 * If there isn't enough heap available to meet the demand, the application will encounter an
 * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
 * JVM processes under extreme memory pressure may exhibit poor GC behavior.
 *
 * This is a convenient option if you doubt that your buffer will be that large, but also don't
 * wish to pick particular constraints, such as in testing.
 *
 * This buffer is "strict" in the sense that it will enforce the time bound or crash.
 * It will never emit early.
 */
 static StrictBufferConfig unbounded();

        /**
 * Set the buffer to be unconstrained by size (either keys or bytes).
 *
 * As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
 *
 * If there isn't enough heap available to meet the demand, the application will encounter an
 * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
 * JVM processes under extreme memory pressure may exhibit poor GC behavior.
 *
 * This is a convenient option if you doubt that your buffer will be that large, but also don't
 * wish to pick particular constraints, such as in testing.
 *
 * This buffer is "strict" in the sense that it will enforce the time bound or crash.
 * It will never emit early.
 */
 StrictBufferConfig withNoBound();

        /**
 * Set the buffer to gracefully shut down the application when any of its constraints are violated
 *
 * This buffer is "strict" in the sense that it will enforce the time bound or shut down.
 * It will never emit early.
 */
 StrictBufferConfig shutDownWhenFull();

        /**
 * Sets the buffer to use on-disk storage if it requires more memory than the constraints allow.
 *
 * This buffer is "strict" in the sense that it will never emit early.
 */
 StrictBufferConfig spillToDiskWhenFull();

        /**
 * Set the buffer to just emit the oldest records when any of its constraints are violated.
 *
 * This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing
 * duplicate results downstream, but does not promise to eliminate them.
 */
 BufferConfig emitEarlyWhenFull();
    }

    /**
 * Configure the suppression to emit only the "final results" from the window.
 *
 * By default all Streams operators emit results whenever new results are available.
 * This includes windowed operations.
 *
 * This configuration will instead emit just one result per key for each window, guaranteeing
 * to deliver only the final result. This option is suitable for use cases in which the business logic
 * requires a hard guarantee that only the final result is propagated. For example, sending alerts.
 *
 * To accomplish this, the operator will buffer events from the window until the window close (that is,
 * until the end-time passes, and additionally until the grace period expires). Since windowed operators
 * are required to reject late events for a window whose grace period is expired, there is an additional
 * guarantee that the final results emitted from this suppression are eventually consistent with the upstream
 * operator and its queriable state, if enabled.
 *
 * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
 * This is required to be a "strict" config, since it would violate the "final results"
 * property to emit early and then issue an update later.
 * @param <K> The key type for the KTable to apply this suppression to. "Final results" mode is only available
 * on Windowed KTables (this is enforced by the type parameter).
 * @return a "final results" mode suppression configuration
 */
 static <K extends Windowed> Suppressed<K> untilWindowCloses(final StrictBufferConfig bufferConfig);

    /**
 * Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of time after receiving a record
 * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
 * the first record in the buffer but does <em>not</em> re-start the timer.
 *
 * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
 * @param <K> The key type for the KTable to apply this suppression to.
 * @return a suppression configuration
 */
 static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents);

    /**
 * Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of time after receiving a record
 * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
 * the first record in the buffer but does <em>not</em> re-start the timer.
 *
 * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
 * @param <K> The key type for the KTable to apply this suppression to.
 * @return a suppression configuration
 */
 static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig);
}


Along with the suppression operator, we will add several metrics. Note that suppress will not add to the skipped-records metrics. "Skipped" records are records that are for one reason or another invalid. "Suppressed" records are intentionally dropped, just like filtered records. Likewise with events arriving later than the grace period for windows.

Metrics

Note: I'm not proposing roll-up metrics for these. They would be reported at the processor-node level. I suspect this is actually fine, and roll-ups can easily be added later if necessary.

Metrics we'll add:

  • late records (new records older than the grace period) are currently metered as "skipped-records" and logged at WARN level. As noted, this is not correct, so we will change the logs to DEBUG level and add new metrics: 
    • average and max observed lateness of all records: to help configure the grace period
      • (INFO) record-lateness-[avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of dropped events for closed windows
      • (INFO) late-record-drop-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
  • expired records (new records for segments older than the state store's retention period) are currently not metered and logged at DEBUG level. Since the grace period is currently equivalent to the retention period, this should currently be rare, as such events would never reach the state store but be marked as skipped and dropped in the processor. However, since we are deprecating `Windows.until` and defining retention only on the state store, it would become much more common. Therefore, we'll add some new state store metrics:
    • rate and total of events for expired windows
      • (INFO) expired-window-record-drop-[rate | total] type=stream-[storeType]-state-metrics client-id=<threadId> task-id=<taskId> [storeType]-state-id=[storeName]
  • intermediate event suppression
    • current, average, and peak intermediate suppression buffer size
      • (INFO) suppression-mem-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • current, average, and peak intermediate suppression disk buffer size (only present when using the "SPILL_TO_DISK" buffer-full strategy)
      • (INFO) suppression-disk-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • intermediate suppression buffer eviction rate and total: to how often events are emitted early (only present when using the "EMIT" buffer-full strategy)
      • (INFO) suppression-mem-buffer-evict-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of intermediate result suppressions
      • (INFO) intermediate-result-suppression-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • min and average intermediate suppression overtime: to determine whether the intermediate suppression emitAfter is delaying longer than necessary. This metric may be unnecessary, since it's equivalent to (timeToWaitForMoreEventsConfig - observedLatenessMetric).
      • (INFO) intermediate-result-suppression-overtime-[min | avg] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>

Examples

Here are some examples of programs using the new operator to achieve various goals.

Final window results per key

Imagine we wish to send an alert if a user has fewer than 3 events in an hour. For the example, we'll wait up to 10 minutes after the hour ends for late-arriving events.

No Format
builder.<Integer, String>stream("events")
    .groupByKey()
    .windowedBy(TimeWindows.of(3600_000).grace(600_000)
    .count()
    .suppress(untilWindowCloses(BufferConfig.unbounded()))
    .toStream()
    .filter((key, value) -> value < 3)
    .foreach((key, value) -> sendAlert("User " + key.key() + " had fewer than 3 events in window " + key.window()));

Note that we can handle limited memory in a few different ways:

No Format
// Option 1: expect not to run out of memory
windowCounts
  .suppress(untilWindowCloses(unbounded()))


// Option 2: shut down gracefully if we need too much memory
windowCounts
  .suppress(untilWindowCloses(maxBytes(5_000_000).shutDownWhenFull()))


// Option 3: Start using disk if we need too much memory
windowCounts
  .suppress(untilWindowCloses(maxBytes(5_000_000).spillToDiskWhenFull()))

Any of these constructions yield a strict guarantee that each windowed key will emit exactly one event.

Note about the "shut down when full" option:

This may seem like a strange option for production code, but consider that in practice there is limited heap size available. As with all data structures, if you need to store more data than fits in memory, then you will run out of memory and crash. For Java in particular, as the available heap approaches 0, the garbage collector will consume more and more CPU, which can cause the application to become unresponsive long before an actual OutOfMemoryError occurs.

The idea of this option is to pick some reasonably large bound to allow a graceful and performant shutdown before this occurs.

Then, the operator can increase the heap size and restart, or the developer can decrease the grace period (decreasing the required heap), or choose another strategy, such as on-disk bufferingAny of these constructions yield a strict guarantee that each windowed key will emit exactly one event.


Rate-limited updates

Suppose we wish to reduce the rate of updates from a KTable to roughly one update every 30s per key. We don't want to use too much memory for this, and we don't think we'll have updates for more than 1000 keys at any one time.

No Format
table
  .suppress(
  intermediateEvents(
    withEmitAfter(30_000)
      .bufferConfig(
        withBufferKeys(1000).bufferFullStrategy(EMIT)
      untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000)))
  )
).toStream(); // etc.


Compatibility, Deprecation, and Migration Plan

...