THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.state;
// new interfaces
public interface ValueAndTimestamp<V> {
V value();
long timestamp();
}
public interface KeyValueWithTimestampStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {
void put(K key, V value, long timestamp);
ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp);
}
public interface WindowWithTimestampStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {
// note, we don't add `put(K key, V value, long timestamp)`
void put(K key, V value, long timestamp, long windowStartTimestamp);
}
public interface SessionWithTimestampStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> {
void put(final Windowed<K> sessionKey, final AGG aggregate, final long timestamp);
}
public interface RecordConverter {
ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record);
}
// extend existing classes (omitting existing method)
public final class Stores {
public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueWithTimestampStoreBuilder(final KeyValueBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde);
public static <K, V> StoreBuilder<WindowWithTimestampStore<K, V>> windowWithTimestampStoreBuilder(final WindowBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde;
public static <K, V> StoreBuilder<SessionWithTimestampsStore<K, V>> sessionWithTimestampStoreBuilder(final SessionBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde);
}
public final class QueryableStoreTypes {
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> keyValueWithTimestampStore();
public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>><K, V>> windowWithTimestampStore();
public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>><K, V>> sessionWithTimestampStore();
}
// test-utils package
public class TopologyTestDriver {
public <K, V> KeyValueWithTimestampStore<K, V> getKeyValueWithTimestampStore(final String name);
public <K, V> WindowWithTimestampStore<K, V> getWindowWithTimestampStore(final String name);
public <K, V> SessionWithTimestampStore<K, V> getSessionWithTimestampStore(final String name);
} |
New stores (RocksDB, InMemory) will be added that implement the corresponding new interfaces. Note, we keep the existing stores as-is and only add new stores that can be used by PAPI users too; by default, PAPI users would need to rewrite their application to switch from existing store usage to new stores if desired.
...