Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Don't need serializers in the interface (since they can be inherited now)

...

No Format
public interface Suppressed<K, V> {

    interface TimeDefinition<K, V> {
        long time(ProcessorContext context, K k, V v);
    }

    enum BufferFullStrategy {
        EMIT,
        SPILL_TO_DISK,
        SHUT_DOWN
    }

    interface BufferConfig<K, V> {
        static <K, V> BufferConfig<K, V> withBufferKeys(final long numberOfKeysToRemember)

        BufferConfig<K, V> bufferKeys(final long numberOfKeysToRemember);

        static <K, V> BufferConfig<K, V> withBufferBytes(final long bytesToUseForSuppressionStorage,
                                                         final Serializer<K> keySerializer,
                                                         final Serializer<V> valueSerializer)

        BufferConfig<K, V> bufferBytes(final long bytesToUseForSuppressionStorage, 
                         

              final Serializer<K> keySerializer,
                                       final Serializer<V> valueSerializerBufferConfig<K, V> bufferBytes(final long bytesToUseForSuppressionStorage);

        static <K, V> BufferConfig<K, V> withBufferFullStrategy(final BufferFullStrategy bufferFullStrategy)

        BufferConfig<K, V> bufferFullStrategy(final BufferFullStrategy bufferFullStrategy);
    }

    interface IntermediateSuppression<K, V> {
        static <K, V> IntermediateSuppression<K, V> withEmitAfter(final long millisToWaitForMoreEvents)

        IntermediateSuppression<K, V> emitAfter(final long millisToWaitForMoreEvents);

        static <K, V> IntermediateSuppression<K, V> withBufferConfig(final BufferConfig<K, V> bufferConfig)

        IntermediateSuppression<K, V> bufferConfig(final BufferConfig<K, V> bufferConfig);
    }

    static <K extends Windowed, V> Suppressed<K, V> emitFinalResultsOnly(final BufferConfig<K, V> bufferConfig)

    static <K, V> Suppressed<K, V> intermediateEvents(final IntermediateSuppression<K, V> intermediateSuppression)
}

...

No Format
// Option 1: expect not to run out of memory
windowCounts
  .suppress(emitFinalResultsOnly(withBufferKeys(Long.MAX_VALUE)))


// Option 2: shut down gracefully if we need too much memory
windowCounts
  .suppress(
    emitFinalResultsOnly(
      withBufferBytes(5_000_000, keySerializer, valueSerializer)
        .bufferFullStrategy(SHUT_DOWN)
    )
  );


// Option 3: Start using disk if we need too much memory
windowCounts
  .suppress(
    emitFinalResultsOnly(
      withBufferBytes(5_000_000, keySerializer, valueSerializer)
        .bufferFullStrategy(SPILL_TO_DISK)
    )
  );

...