THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.state; // new interfaces public interface ValueAndTimestamp<V> { V value(); long timestamp(); } public interface KeyValueWithTimestampStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> { void put(K key, V value, long timestamp); ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp); } public interface WindowWithTimestampStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> { // note, we don't add `put(K key, V value, long timestamp)` void put(K key, V value, long timestamp, long windowStartTimestamp); } public interface SessionWithTimestampStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> { void put(final Windowed<K> sessionKey, final AGG aggregate, final long timestamp); } public interface RecordConverter { ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record); } // extend existing classes (omitting existing method) public final class Stores { public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueWithTimestampStoreBuilder(final KeyValueBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde); public static <K, V> StoreBuilder<WindowWithTimestampStore<K, V>> windowWithTimestampStoreBuilder(final WindowBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde; public static <K, V> StoreBuilder<SessionWithTimestampsStore<K, V>> sessionWithTimestampStoreBuilder(final SessionBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde); } public final class QueryableStoreTypes { public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> keyValueWithTimestampStore(); public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>><K, V>> windowWithTimestampStore(); public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>><K, V>> sessionWithTimestampStore(); } // test-utils package public class TopologyTestDriver { public <K, V> KeyValueWithTimestampStore<K, V> getKeyValueWithTimestampStore(final String name); public <K, V> WindowWithTimestampStore<K, V> getWindowWithTimestampStore(final String name); public <K, V> SessionWithTimestampStore<K, V> getSessionWithTimestampStore(final String name); } |
...