Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams.state;

// new classed and interfaces

public class ValueAndTimestamp<V> {
    private ValueAndTimestamp(final V value, final long timestamp); // use `make()` instead

    public V value();
    public long timestamp();
    public static <V> ValueAndTimestamp<V> make(final V value, final long timestamp); // returns `null` if `value==null`
    public static <V> V getValueOrNull(final ValueAndTimestamp<V> valueAndTimestamp); // returns `null` if `valueAndTimestamp==null`
}

public interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {}

public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {}

public interface TimestampedSessionStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> {}

public interface TimestampedBytesStore {
    static byte[] convertToTimestampedFormat(final byte[] plainValue);
}

// extend existing classes (omitting existing method)

public final class Stores {
    public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name);

    public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
                                                                            final Duration retentionPeriod,
                                                                            final Duration windowSize,
                                                                            final boolean retainDuplicates);

    public static SessionBytesStoreSupplier persistentTimestampedSessionStore(final String name,
                                                                              final Duration retentionPeriod);

    public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                      final Serde<K> keySerde,
                                                                                                      final Serde<V> valueSerde);


    public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier,
                                                                                                  final Serde<K> keySerde,
                                                                                                  final Serde<V> valueSerde;

    public static <K, V> StoreBuilder<TimestampedSessionStore<K, V>> timestampedSessionStoreBuilder(final SessionBytesStoreSupplier supplier,
                                                                                                    final Serde<K> keySerde,
                                                                                                    final Serde<V> valueSerde);
}

public final class QueryableStoreTypes {
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> timestampedKeyValueStore();

    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> timestampedWindowStore();

    public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>>> timestampedSessionStore();
}


// test-utils package


public class TopologyTestDriver {
    public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name);

    public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(final String name);

    public <K, V> SessionStore<K, ValueAndTimestamp<V>> getTimestampedSessionStore(final String name);
}

...