THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.state; // new classed and interfaces public class ValueAndTimestamp<V> { V value(); long timestamp(); } public interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {} public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {} public interface TimestampedSessionStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> {} public interface TimestampedBytesStore { static byte[] convertToTimestampedFormat(final byte[] plainValue); } // extend existing classes (omitting existing method) public final class Stores { public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde); public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde; public static <K, V> StoreBuilder<TimestampedSessionStore<K, V>> timestampedSessionStoreBuilder(final SessionBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde); } public final class QueryableStoreTypes { public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> timestampedKeyValueStore(); public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>><K, V>> timestampedWindowStore(); public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>><K, V>> timestampedSessionStore(); } // test-utils package public class TopologyTestDriver { public <K, V> TimestampedKeyValueStore<KKeyValueStore<K, V>ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name); public <K, V> TimestampedWindowStore<KWindowStore<K, V>ValueAndTimestamp<V>> getTimestampedWindowStore(final String name); public <K, V> TimestampedSessionStore<KSessionStore<K, V>ValueAndTimestamp<V>> getTimestampedSessionStore(final String name); } |
...