Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams.state;

// new classed and interfaces

public class ValueAndTimestamp<V> {
    V value();
    long timestamp();
}

public interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {}

public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {}

public interface TimestampedSessionStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> {}

public interface TimestampedBytesStore {
    static byte[] convertToTimestampedFormat(final byte[] plainValue);
}

// extend existing classes (omitting existing method)

public final class Stores {
    public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                      final Serde<K> keySerde,
                                                                                                      final Serde<V> valueSerde);


    public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier,
                                                                                                  final Serde<K> keySerde,
                                                                                                  final Serde<V> valueSerde;

    public static <K, V> StoreBuilder<TimestampedSessionStore<K, V>> timestampedSessionStoreBuilder(final SessionBytesStoreSupplier supplier,
                                                                                                    final Serde<K> keySerde,
                                                                                                    final Serde<V> valueSerde);
}

public final class QueryableStoreTypes {
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> timestampedKeyValueStore();

    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>><K, V>> timestampedWindowStore();

    public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>><K, V>> timestampedSessionStore();
}


// test-utils package


public class TopologyTestDriver {
    public <K, V> TimestampedKeyValueStore<KKeyValueStore<K, V>ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name);

    public <K, V> TimestampedWindowStore<KWindowStore<K, V>ValueAndTimestamp<V>> getTimestampedWindowStore(final String name);

    public <K, V> TimestampedSessionStore<KSessionStore<K, V>ValueAndTimestamp<V>> getTimestampedSessionStore(final String name);
}

...