Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams.state;

// new interfaces

public interface ValueAndTimestamp<V> {
    V value();
    long timestamp();
}

public interface KeyValueWithTimestampStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {
    void put(K key, V value, long timestamp);
    ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp);
}

public interface WindowWithTimestampStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {
    // note, we don't add `put(K key, V value, long timestamp)`
    void put(K key, V value, long timestamp, long windowStartTimestamp);
}

public interface SessionWithTimestampStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> {
    void put(final Windowed<K> sessionKey, final AGG aggregate, final long timestamp);
}

public interface RecordConverter {
    ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record);
}

// extend existing classes (omitting existing method)

public final class Stores {
    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueWithTimestampStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                          final Serde<K> keySerde,
                                                                                                          final Serde<V> valueSerde);


    public static <K, V> StoreBuilder<WindowWithTimestampStore<K, V>> windowWithTimestampStoreBuilder(final WindowBytesStoreSupplier supplier,
                                                                                                      final Serde<K> keySerde,
                                                                                                      final Serde<V> valueSerde;

    public static <K, V> StoreBuilder<SessionWithTimestampsStore<K, V>> sessionWithTimestampStoreBuilder(final SessionBytesStoreSupplier supplier,
                                                                                                         final Serde<K> keySerde,
                                                                                                         final Serde<V> valueSerde);
}

public final class QueryableStoreTypes {
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> keyValueWithTimestampStore();

    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>><K, V>> windowWithTimestampStore();

    public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>><K, V>> sessionWithTimestampStore();
}


// test-utils package


public class TopologyTestDriver {
    public <K, V> KeyValueWithTimestampStore<K, V> getKeyValueWithTimestampStore(final String name);

    public <K, V> WindowWithTimestampStore<K, V> getWindowWithTimestampStore(final String name);

    public <K, V> SessionWithTimestampStore<K, V> getSessionWithTimestampStore(final String name);
}

...