Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated the KIP with the latest results of the discussion.

...

At the moment, I'm proposing to provide two main kinds of suppressions:

  1. Late EventsAll but the final result for windows. You can use suppress to bound the lateness of events in the stream. "Lateness" specifically means the difference between record time and stream timeget exactly one final result per window/key for windowed computations. This includes both time and session windows.
    1. This feature requires adding a "grace period" parameter for windows.
  2. Intermediate Events. You can use suppress to wait for more updates on the same key before emitting them downstream. There are two main configuration knobs to tune here:
    1. How long to wait for more updates before emitting. This is an amount of time, measured either from the event time (for regular KTables) or from the window end (for windowed KTables), to buffer up each key before emitting them downstream.
    2. How much memory to use for buffering events (and what to do when it fills up). Suppress needs to remember the last observed event for each key, which takes memory. You can configure the amount of memory either by number of keys or by raw heap size. You can configure what to do when the buffer runs out of memory:
      1. Emit when full. This option is basically "best effort" suppression. If suppress runs out of memory, it will emit the oldest event.
      2. Spill to disk. This option is for when you need suppression to be strict. If the operator runs out of memory, it will allocate a RocksDB database and begin storing new records there. This may be complicated to implement, and consequently might be a future addition.
      3. Shut down. There is an option to run with the assumption that your "live set" of suppression events will fit within memory. If this assumption turns out to be false, the application will run out of memory and shut down. This may sound strange, but it's no different than using an InMemoryKeyValueStore or even an ArrayList. In the case of suppress, you can either leave the memory buffer unconstrained, which will cause an application crash if it runs out of memory, or you can configure it with a memory constraint and tell it to shut down gracefully when it runs out. Either one is safe from a correctness standpoint, but the latter will result in faster recovery. 

...

You can achieve exactly one event per key per window by configuring:

  • A grace period on the window. This would be a balance of how late you expect events to be and how long you're willing to wait before finalizing the window and emitting the result.
  • Suppress with "emitFinalResultsOnly". There's a specific "Suppress" configuration for this feature. It allows you to configure the memory buffer and decide whether to shut down or switch to disk
  • Unlimited memory (either with spill-to-disk or just not limiting the memory)
  • Choose a maximum time (T) to wait after window-end for late events. Configure suppress to drop late events after T and also to suppress intermediate events for up to T. This will result in exactly one event for each key in each window, which represents the "final" event for that window.

Best-effort rate limit per key

...

  • Unlimited memory (either with spill-to-disk or just not limiting the memory)
  • The inverse of the desired rate. If the desired rate is R (events per second), the inverse is the buffering time: 1/R (seconds per event).

Details and Public Interfaces

We will add the new operator to KTable:

No Format
public interface KTable<K, V> {


  /**
  * Suppress some updates from this changelog stream, determined by the supplied {@link Suppression}.
  *
  * This controls what updates downstream table and stream operations will receive.
  *
  * @param suppression Configuration object determining what, if any, updates to suppress.
  * @return A new KTable with the desired suppression characteristics.
  */
  KTable<K, V> suppress(final Suppression suppression);


}

We will also create the config object Suppression:

There are two primary changes we're proposing: Altering the window definition and adding a KTable#suppress operator.

Add Grace Period to Window Spec Builders

For our new Suppress operator to support window final results, we need to define a point at which window results are actually final!

Currently, the only hint we have about this is the retention period (called until/maintainMs/retentionPeriod). Once we drop an old window, we can obviously never update it again. However, this isn't a convenient "final" point for our purpose. Retention time is typically very large (the default is one day), and users of IQ may need to keep the retention time large in order to support queries even over final windows. Plus, how long a window is retained is really a property of the window store implementation. In principle, an in-memory implementation might choose to retain events for a short time while a remote distributed store may keep them forever. This shouldn't prohibit the usage of final results, though.

To resolve this conflict, we're adding a new concept to the window spec: grace period. This is an amount of time that the window should accept late-arriving events after the window ends. After the grace period passes, the window is considered "closed" and will never be updated again. The grace period places a lower-bound constraint on the retention time, but otherwise has no implication on retention.

We will add a "grace()" configuration to the window spec builders (Windows, TimeWindows, JoinWindows, UnlimitedWindows, and SessionWindows.

We are also deprecating the retention time as specified on the window spec itself and moving it to the lower-level store configurations.

No Format
+ /**
+  * Reject late events that arrive more than {@code afterWindowEnd}
+  * after the end of its window.
+  *
+  * @param afterWindowEnd The grace period to admit late-arriving events to a window.
+  * @return this updated builder
+  */
+ public Windows<W> grace(final Duration afterWindowEnd);

+ /**
+  * Return the window grace period (the time to admit
+  * late-arriving events after the end of the window.
+  */
+ public Duration grace();


/**
 ...
+ * @deprecated since 2.1. Use {@link Joined#retention()} 
+ * or {@link Materialized#retention}
+ * or directly configure the retention in a store supplier and use
+ * {@link Materialized#as(WindowBytesStoreSupplier)}.
 */
+ @Deprecated
public Windows<W> until(final long durationMs);

Note for UnlimitedWindows in particular, the grace period is meaningless because the window never ends. Therefore, we provide this interface instead:

No Format
/**
 * Throws an {@link IllegalArgumentException} because the window never ends and the
 * grace period is therefore meaningless.
 *
 * @throws IllegalArgumentException on every invocation
 */
@Override
public Windows<UnlimitedWindow> grace(final Duration afterWindowEnd);


The Window/Session BytesStoreSupplier interface already includes retention period. The existing behavior is that that retention period overrides maintainMs if set on the window. We'll preserve this behavior.

We will add retention to the Joined config object, which is already used to configure the underlying stores for KStream joins:

No Format
/**
 * Configure the retention time for events in the join.
 * Events are only considered for joining while they are retained.
 * 
 * @param retention
 * @return
 */
public Joined<K, V, VO> withRetention(final Duration retention);


public Duration retention();

Likewise, we will add retention to Materialized:

No Format
/**
 * Configure retention period for window and session stores. Ignored for key/value stores.
 *
 * Overridden by pre-configured store suppliers
 * ({@link Materialized#as(SessionBytesStoreSupplier)} or {@link Materialized#as(WindowBytesStoreSupplier)}).
 *
 * @return itself
 */
public Materialized<K, V, S> withRetention(final Duration retention);

New Suppress Operator

We will add the new operator to KTable:

No Format
public interface KTable<K, V> {


 /**
 * Suppress some updates from this changelog stream, determined by the supplied {@link Suppress}.
 *
 * This controls what updates downstream table and stream operations will receive.
 *
 * @param suppress Configuration object determining what, if any, updates to suppress.
 * @return A new KTable with the desired suppress characteristics.
 */
 KTable<K, V> suppress(final Suppress<K, V> suppress);


}

Note the absence of a variant taking Materialized. The result of a suppression will always be (eventually) consistent with the source KTable, so I'm thinking right now that it would be "bad advice" to present the option to materialize it.

We will also create the config object Suppress:

No Format
public interface Suppress<K, V> {

    interface TimeDefinition<K, V> {
        long time(ProcessorContext context, K k, V v);
    }

    enum BufferFullStrategy {
        EMIT,
        SPILL_TO_DISK,
        SHUT_DOWN
    }

    interface BufferConfig<K, V> {
        static <K, V> BufferConfig<K, V> withBufferKeys(final long numberOfKeysToRemember)

        BufferConfig<K, V> bufferKeys(final long numberOfKeysToRemember);

        static <K, V> BufferConfig<K, V> withBufferBytes(final long bytesToUseForSuppressionStorage,
                                 
No Format
public class Suppression {
    public enum BufferFullStrategy {
        EMIT,
        SPILL_TO_DISK,
        SHUT_DOWN
    }

    public static class IntermediateSuppression {

        public static IntermediateSuppression emitAfter(final Duration timeToWaitForMoreEvents);

        public static IntermediateSuppression bufferKeys(final long numberOfKeysToRemember);

Serializer<K> keySerializer,
               public static IntermediateSuppression bufferBytes(final long bytesToUseForSuppressionStorage);

        public static IntermediateSuppression bufferFullStrategy(final BufferFullStrategy bufferFullStrategy);
    
    }

    public static Suppression suppressLateEvents(final Duration maxAllowedLateness);

    public static Suppression suppressIntermediateEvents(final IntermediateSuppressionSerializer<V> intermediateSuppressionvalueSerializer);

}

Metrics

Along with the suppression operator, we will add several metrics. Note that suppress will not add to the skipped-records metrics. "Skipped" records are records that are for one reason or another invalid. "Suppressed" records are intentionally dropped, just like filtered records.

Metrics we'll add:

  • late event suppression
    • average and max observed lateness: to help configure the maxAllowedLateness
      • (INFO) event-lateness-[avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of late event suppressions
      • (INFO) late-event-suppression-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
  • intermediate event suppression
    • current, average, and peak intermediate suppression buffer size
      • (INFO) suppression-mem-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • current, average, and peak intermediate suppression disk buffer size (only present when using the "SPILL_TO_DISK" buffer-full strategy)
      • (INFO) suppression-disk-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • intermediate suppression buffer eviction rate and total: to how often events are emitted early (only present when using the "EMIT" buffer-full strategy)
      • (INFO) suppression-mem-buffer-evict-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of intermediate event suppressions
      • (INFO) intermediate-event-suppression-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • min and average intermediate suppression overtime: to determine whether the intermediate suppression emitAfter is delaying longer than necessary. This metric may be unnecessary, since it's equivalent to (timeToWaitForMoreEventsConfig - observedLatenessMetric).
      • (INFO) intermediate-event-suppression-overtime-[min | avg] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>

Examples

Here are some examples of programs using the new operator to achieve various goals.

Final window results per key

Imagine we wish to send an alert if a user has fewer than 3 events in an hour. For the example, we'll wait up to 10 minutes after the hour ends for late-arriving events.

No Format
builder.<Integer, String>stream("events")
    .groupByKey()
    .windowedBy(TimeWindows.of(3600_000))
    .count()
    .suppress(
        new Suppression()

        BufferConfig<K, V> bufferBytes(final long bytesToUseForSuppressionStorage, 
                                       final Serializer<K> keySerializer,
                                       final Serializer<V> valueSerializer);

        static <K, V> BufferConfig<K, V> withBufferFullStrategy(final BufferFullStrategy bufferFullStrategy)

        BufferConfig<K, V> bufferFullStrategy(final BufferFullStrategy bufferFullStrategy);
    }

    interface IntermediateSuppression<K, V> {
        static <K, V> IntermediateSuppression<K, V> withEmitAfter(final Duration timeToWaitForMoreEvents)

        IntermediateSuppression<K, V> emitAfter(final Duration timeToWaitForMoreEvents);

        static <K, V> IntermediateSuppression<K, V> withBufferConfig(final BufferConfig<K, V> bufferConfig)

        IntermediateSuppression<K, V> bufferConfig(final  .suppressLateEvents(Duration.ofMinutes(10))
            .suppressIntermediateEvents(BufferConfig<K, V> bufferConfig);
    }

    static <K extends Windowed, V> Suppress<K, V> emitFinalResultsOnly(final BufferConfig<K, V> bufferConfig)

    static <K, V> Suppress<K, V> intermediateEvents(final IntermediateSuppression<K,      new IntermediateSuppression().emitAfter(Duration.ofMinutes(10))
            )
    )
    .toStream()
    .filter((key, value) -> value < 3)
    .foreach((key, value) -> sendAlert("User " + key.key() + " had fewer than 3 events in window " + key.window()));

Note that we can handle limited memory in a few different ways:

V> intermediateSuppression)
}

Metrics

Along with the suppression operator, we will add several metrics. Note that suppress will not add to the skipped-records metrics. "Skipped" records are records that are for one reason or another invalid. "Suppressed" records are intentionally dropped, just like filtered records. Likewise with events arriving later than the grace period for windows.

Note: I'm not proposing roll-up metrics for these. They would be reported at the processor-node level. I suspect this is actually fine, and roll-ups can easily be added later if necessary.

Metrics we'll add:

  • late events
    • average and max observed lateness of all records: to help configure the grace period
      • (INFO) event-lateness-[avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of dropped events for closed windows
      • (INFO) late-event-drop-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
  • intermediate event suppression
    • current, average, and peak intermediate suppression buffer size
      • (INFO) suppression-mem-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • current, average, and peak intermediate suppression disk buffer size (only present when using the "SPILL_TO_DISK" buffer-full strategy)
      • (INFO) suppression-disk-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • intermediate suppression buffer eviction rate and total: to how often events are emitted early (only present when using the "EMIT" buffer-full strategy)
      • (INFO) suppression-mem-buffer-evict-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of intermediate event suppressions
      • (INFO) intermediate-event-suppression-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • min and average intermediate suppression overtime: to determine whether the intermediate suppression emitAfter is delaying longer than necessary. This metric may be unnecessary, since it's equivalent to (timeToWaitForMoreEventsConfig - observedLatenessMetric).
      • (INFO) intermediate-event-suppression-overtime-[min | avg] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>

Examples

Here are some examples of programs using the new operator to achieve various goals.

Final window results per key

Imagine we wish to send an alert if a user has fewer than 3 events in an hour. For the example, we'll wait up to 10 minutes after the hour ends for late-arriving events.

No Format
builder.<Integer, String>stream("events")
    .groupByKey()
    .windowedBy(TimeWindows.of(3600_000).grace
No Format
// Option 1: expect not to run out of memory
windowCounts
    .suppress(
        new Suppression()
            .suppressLateEvents(Duration.ofMinutes(10))
            .suppressIntermediateEvents(
                new IntermediateSuppression().emitAfter(Duration.ofMinutes(10))
            )
    );


// Option 2: shut down gracefully if we need too much memory
windowCounts
    .suppress(
        new Suppression()
            .suppressLateEvents(Duration.ofMinutes(10))
            .suppressIntermediateEvents(
                new IntermediateSuppression()
                    .emitAfter(Duration.ofMinutes(10))
    .count()
                .bufferBytes(5_000_000.suppress(emitFinalResultsOnly(withBufferFullStrategy(SHUT_DOWN)))
    .toStream()
    .filter((key, value) -> value         .bufferFullStrategy(SHUT_DOWN)< 3)
    .foreach((key, value) // <- NOTE
            )
    );


> sendAlert("User " + key.key() + " had fewer than 3 events in window " + key.window()));

Note that we can handle limited memory in a few different ways:

No Format
// Option 31: Startexpect usingnot diskto ifrun weout needof too much memory
windowCounts
    .suppress(
        new Suppression()
            .suppressLateEvents(Duration.ofMinutes(10))
            .suppressIntermediateEventsemitFinalResultsOnly(withBufferKeys(Long.MAX_VALUE)))


// Option 2: shut down gracefully if we need too much memory
windowCounts
  .suppress(
    emitFinalResultsOnly(
      withBufferBytes(5_000_000,      new IntermediateSuppression()
            keySerializer, valueSerializer)
        .emitAfter(Duration.ofMinutes(10)bufferFullStrategy(SHUT_DOWN)
    )
  );


// Option 3: Start using disk if we need too much  memory
windowCounts
  .bufferBytes(5_000_000)suppress(
    emitFinalResultsOnly(
      withBufferBytes(5_000_000, keySerializer, valueSerializer)
        .bufferFullStrategy(SPILL_TO_DISK)   // <- NOTE
       
     )
    );

Any of these constructions yield a strict guarantee that each windowed key will emit exactly one event.

...

Suppose we wish to reduce the rate of updates from a KTable to roughly one update every 30s per key. We don't want to use too much memory for this, and we don't think we'll have updates for more than 1000 keys at any one time.

No Format
table.suppress(
    new SuppressionintermediateEvents()
        .suppressIntermediateEvents(
            new IntermediateSuppression()
                .emitAfter(withEmitAfter(Duration.ofSeconds(30))
      .bufferConfig(
          .bufferKeys(1000withBufferKeys(1000).bufferFullStrategy(EMIT)
      )
  )
).toStream();        .bufferFullStrategy(EMIT)
        )
).toStream(); // etc.

Bounded lateness

If for some reason, we wish to guarantee that late updates won't flow downstream from a certain point, we can use suppress for this as well:

No Format
table
    .suppress(new Suppression().suppressLateEvents(Duration.ofMinutes(10)))
    .toStream(); // etc

Compatibility, Deprecation, and Migration Plan

This is a completely new and orthogonal operation, so I don't expect any compatibility or migration problems.

One deprecation we could consider in the future is to revisit the state store caching mechanism, but that also serves the function of limiting i/o to the state store, so I think that should be a separate discussion.

Rejected Alternatives

There are many alternative ways to achieve this behavior.

// etc.


Compatibility, Deprecation, and Migration Plan

The only part of the KIP that's relevant to existing APIs is the deprecation of Windows#until/maintainMs. I've described above how the deprecation warnings will look, and also what new APIs will replace them. All the implementations will be done in such a way that existing Streams applications will have exactly the same semantics before and after this KIP, so there's no concern about continuing to use the deprecated APIs.

One other change we could consider in the future is to revisit the state store caching mechanism, but that also serves the function of limiting i/o to the state store, so I think that should be a separate discussion.

Rejected Alternatives

There are many alternative ways to achieve this behavior.

At the top of the list, I also considered having dedicated operators for final events on windowed tables and update suppression on non-windowed ones. But the current proposal supports the same behavior with just one new operator.

We also considered having windowed computations directly provide the "final results" feature via an "Emitted" config object, but ultimately settled on adding the grace period to the window and letting "suppress" deal with suppressing all but the final result.

In fact, I previously proposed not to support "final results" directly, but instead to allow a suppression with an upper bound on lateness. using the same time for this lateness bound and intermediate suppression would naturally yield final results only. But we judged that this API was too esoteric. The version we have now is much more straightforward for this use caseAt the top of the list, I also considered having dedicated operators for final events on windowed tables and update suppression on non-windowed ones. But the current proposal supports the same behavior with just one new operator.