Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Rename Suppress -> Suppressed

...

No Format
public interface KTable<K, V> {


 /**
 * Suppress some updates from this changelog stream, determined by the supplied {@link SuppressSuppressed}.
 *
 * This controls what updates downstream table and stream operations will receive.
 *
 * @param suppresssuppressed Configuration object determining what, if any, updates to suppress.
 * @return A new KTable with the desired suppress characteristics.
 */
 KTable<K, V> suppress(final Suppress<KSuppressed<K, V> suppress);


}

Note the absence of a variant taking Materialized. The result of a suppression will always be (eventually) consistent with the source KTable, so I'm thinking right now that it would be "bad advice" to present the option to materialize it.

We will also create the config object SuppressSuppressed:

No Format
public interface Suppress<KSuppressed<K, V> {

    interface TimeDefinition<K, V> {
        long time(ProcessorContext context, K k, V v);
    }

    enum BufferFullStrategy {
        EMIT,
        SPILL_TO_DISK,
        SHUT_DOWN
    }

    interface BufferConfig<K, V> {
        static <K, V> BufferConfig<K, V> withBufferKeys(final long numberOfKeysToRemember)

        BufferConfig<K, V> bufferKeys(final long numberOfKeysToRemember);

        static <K, V> BufferConfig<K, V> withBufferBytes(final long bytesToUseForSuppressionStorage,
                                                         final Serializer<K> keySerializer,
                                                         final Serializer<V> valueSerializer)

        BufferConfig<K, V> bufferBytes(final long bytesToUseForSuppressionStorage, 
                                       final Serializer<K> keySerializer,
                                       final Serializer<V> valueSerializer);

        static <K, V> BufferConfig<K, V> withBufferFullStrategy(final BufferFullStrategy bufferFullStrategy)

        BufferConfig<K, V> bufferFullStrategy(final BufferFullStrategy bufferFullStrategy);
    }

    interface IntermediateSuppression<K, V> {
        static <K, V> IntermediateSuppression<K, V> withEmitAfter(final long millisToWaitForMoreEvents)

        IntermediateSuppression<K, V> emitAfter(final long millisToWaitForMoreEvents);

        static <K, V> IntermediateSuppression<K, V> withBufferConfig(final BufferConfig<K, V> bufferConfig)

        IntermediateSuppression<K, V> bufferConfig(final BufferConfig<K, V> bufferConfig);
    }

    static <K extends Windowed, V> Suppress<KSuppressed<K, V> emitFinalResultsOnly(final BufferConfig<K, V> bufferConfig)

    static <K, V> Suppress<KSuppressed<K, V> intermediateEvents(final IntermediateSuppression<K, V> intermediateSuppression)
}

...