Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: A couple of tweaks that arose during implementation.

...

No Format
+ /**
+  * Reject late events that arrive more than {@code afterWindowEndmillisAfterWindowEnd}
+  * after the end of its window.
+  *
+  * @param afterWindowEndmillisAfterWindowEnd The grace period to admit late-arriving events to a window.
+  * @return this updated builder
+  */
+ public Windows<W> grace(final Durationlong afterWindowEndmillisAfterWindowEnd);

+ /**
+  * Return the window grace period (the time to admit
+  * late-arriving events after the end of the window.
+  */
+ public Durationlong gracegracePeriodMs();


/**
 ...
+ * @deprecated since 2.1. Use {@link Joined#retention()} 
+ * or {@link Materialized#retention}
+ * or directly configure the retention in a store supplier and use
+ * {@link Materialized#as(WindowBytesStoreSupplier)}.
 */
+ @Deprecated
public Windows<W> until(final long durationMs);

/**
 ...
 * @deprecated since 2.1. Use {@link Joined#retention()} or {@link Materialized#retention} instead.
 */
@Deprecated
public long maintainMs();


/**
 ...
 * @deprecated since 2.1. Instead, directly configure the segment interval in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
 */
@Deprecated
public long segmentInterval();

...

No Format
/**
 * Throws an {@link IllegalArgumentException} because the window never ends and the
 * grace period is therefore meaningless.
 *
 * @throws IllegalArgumentException on every invocation
 */
@Override
public Windows<UnlimitedWindow> grace(final Durationlong afterWindowEndmillisAfterWindowEnd);


The Window/Session BytesStoreSupplier interface already includes retention period. The existing behavior is that that retention period overrides maintainMs if set on the window. We'll preserve this behavior.

We will add retention to the Joined config object, which is already used to configure the underlying stores for KStream joins:

No Format
/**
 * Configure the retention time for events in the join.
 * Events are only considered for joining while they are retained.
 * 
 * @param retention
 * @return
 */
public Joined<K, V, VO> withRetention(final Duration retention);


public Duration retention();

behavior is that that retention period overrides maintainMs if set on the window. We'll preserve this behavior.

We Likewise, we will add retention to Materialized:

No Format
/**
 * Configure retention period for window and session stores. Ignored for key/value stores.
 *
 * Overridden by pre-configured store suppliers
 * ({@link Materialized#as(SessionBytesStoreSupplier)} or {@link Materialized#as(WindowBytesStoreSupplier)}).
 *
 * @return itself
 */
public Materialized<K, V, S> withRetention(final Durationlong retentionretentionMs);

New Suppress Operator

We will add the new operator to KTable:

...

No Format
public interface Suppress<K, V> {

    interface TimeDefinition<K, V> {
        long time(ProcessorContext context, K k, V v);
    }

    enum BufferFullStrategy {
        EMIT,
        SPILL_TO_DISK,
        SHUT_DOWN
    }

    interface BufferConfig<K, V> {
        static <K, V> BufferConfig<K, V> withBufferKeys(final long numberOfKeysToRemember)

        BufferConfig<K, V> bufferKeys(final long numberOfKeysToRemember);

        static <K, V> BufferConfig<K, V> withBufferBytes(final long bytesToUseForSuppressionStorage,
                                                         final Serializer<K> keySerializer,
                                                         final Serializer<V> valueSerializer)

        BufferConfig<K, V> bufferBytes(final long bytesToUseForSuppressionStorage, 
                                       final Serializer<K> keySerializer,
                                       final Serializer<V> valueSerializer);

        static <K, V> BufferConfig<K, V> withBufferFullStrategy(final BufferFullStrategy bufferFullStrategy)

        BufferConfig<K, V> bufferFullStrategy(final BufferFullStrategy bufferFullStrategy);
    }

    interface IntermediateSuppression<K, V> {
        static <K, V> IntermediateSuppression<K, V> withEmitAfter(final Durationlong timeToWaitForMoreEventsmillisToWaitForMoreEvents)

        IntermediateSuppression<K, V> emitAfter(final Durationlong timeToWaitForMoreEventsmillisToWaitForMoreEvents);

        static <K, V> IntermediateSuppression<K, V> withBufferConfig(final BufferConfig<K, V> bufferConfig)

        IntermediateSuppression<K, V> bufferConfig(final BufferConfig<K, V> bufferConfig);
    }

    static <K extends Windowed, V> Suppress<K, V> emitFinalResultsOnly(final BufferConfig<K, V> bufferConfig)

    static <K, V> Suppress<K, V> intermediateEvents(final IntermediateSuppression<K, V> intermediateSuppression)
}

...

  • late records (new records older than the grace period) are currently metered as "skipped-records" and logged at WARN level. As noted, this is not correct, so we will change the logs to DEBUG level and add new metrics: 
    • average and max observed lateness of all records: to help configure the grace period
      • (INFO) eventrecord-lateness-[avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of dropped events for closed windows
      • (INFO) late-eventrecord-drop-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
  • expired records (new records for segments older than the state store's retention period) are currently not metered and logged at DEBUG level. Since the grace period is currently equivalent to the retention period, this should currently be rare, as such events would never reach the state store but be marked as skipped and dropped in the processor. However, since we are deprecating `Windows.until` and defining retention only on the state store, it would become much more common. Therefore, we'll add some new state store metrics:
    • rate and total of events for expired windows
      • (INFO) expired-window-eventrecord-drop-[rate | total] type=stream-[storeType]-state-metrics client-id=<threadId> task-id=<taskId> [storeType]-state-id=[storeName]
  • intermediate event suppression
    • current, average, and peak intermediate suppression buffer size
      • (INFO) suppression-mem-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • current, average, and peak intermediate suppression disk buffer size (only present when using the "SPILL_TO_DISK" buffer-full strategy)
      • (INFO) suppression-disk-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • intermediate suppression buffer eviction rate and total: to how often events are emitted early (only present when using the "EMIT" buffer-full strategy)
      • (INFO) suppression-mem-buffer-evict-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of intermediate event result suppressions
      • (INFO) intermediate-eventresult-suppression-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • min and average intermediate suppression overtime: to determine whether the intermediate suppression emitAfter is delaying longer than necessary. This metric may be unnecessary, since it's equivalent to (timeToWaitForMoreEventsConfig - observedLatenessMetric).
      • (INFO) intermediate-eventresult-suppression-overtime-[min | avg] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>

...

No Format
builder.<Integer, String>stream("events")
    .groupByKey()
    .windowedBy(TimeWindows.of(3600_000).grace(Duration.ofMinutes(10))600_000)
    .count()
    .suppress(emitFinalResultsOnly(withBufferFullStrategy(SHUT_DOWN)))
    .toStream()
    .filter((key, value) -> value < 3)
    .foreach((key, value) -> sendAlert("User " + key.key() + " had fewer than 3 events in window " + key.window()));

...

No Format
table.suppress(
  intermediateEvents(
    withEmitAfter(Duration.ofSeconds(30)30_000)
      .bufferConfig(
        withBufferKeys(1000).bufferFullStrategy(EMIT)
      )
  )
).toStream(); // etc.

...