Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The challenging part of this KIP is to define a smooth upgrade path with the upgraded RocksDB format. Note that only DSL users will be affected, because we want to switch the default stores. 

Public Interfaces

We add a three new store types:

  • KeyValueWithTimstampStore TimstampedKeyValueStore that extends the exiting KeyValueStore
  • WindowWithTimstampStoreTimstampedWindowStore that extends the exiting WindowStore
  • SessionWithTimstampStoreTimstampedSessionStore that extends the exiting SessionStore

...

For a seamless single rolling bounce upgrade, we introduce a RecordConverter interface. This interface can be used to transform binary data from old format into the new format. This is required for reading data from the changelog topic during restore – in our case, the timestamp must be copied concrete implementation will copy the timestamp from the record timestamp field into the value . Upgrade (upgrade details are discussed below). However, the RecordConverter interface could be used for other transformations and is a generic concept, that does not necessarily apply to upgrading stores.

Proposed Changes

To make use of the new timestamp that can be stored, we need to add new interfaces to the existing store interfaces that allow to read/write the timestamp and the value at once.

Code Block
languagejava
package org.apache.kafka.streams.state;

// new interfaces

public interface ValueAndTimestamp<V> {
    V value();
    long timestamp();
}

public interface KeyValueWithTimestampStore<KTimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {
    void put(K key, V value, long timestamp);
    ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp);
}

public interface WindowWithTimestampStore<KTimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {
    // note, we don't add `put(K key, V value, long timestamp)`
    void put(K key, V value, long timestamp, long windowStartTimestamp);
}

public interface SessionWithTimestampStore<KTimestampedSessionStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> {
    void put(final Windowed<K> sessionKey, final AGG aggregate, final long timestamp);
}

public interface RecordConverter {
    ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record);
}

// extend existing classes (omitting existing method)

public final class Stores {
    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<KStoreBuilder<TimestampedKeyValueStore<K, V>> keyValueWithTimestampStoreBuildertimestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                          final Serde<K> keySerde,
                                                                                                          final Serde<V> valueSerde);


    public static <K, V> StoreBuilder<WindowWithTimestampStore<KStoreBuilder<TimestampedWindowStore<K, V>> windowWithTimestampStoreBuildertimestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier,
                                                                                                      final Serde<K> keySerde,
                                                                                                      final Serde<V> valueSerde;

    public static <K, V> StoreBuilder<SessionWithTimestampsStore<KStoreBuilder<TimestampedSessionStore<K, V>> sessionWithTimestampStoreBuildertimestampedSessionStoreBuilder(final SessionBytesStoreSupplier supplier,
                                                                                                         final Serde<K> keySerde,
                                                                                                         final Serde<V> valueSerde);
}

public final class QueryableStoreTypes {
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> keyValueWithTimestampStoretimestampedKeyValueStore();

    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>><K, V>> windowWithTimestampStoretimestampedWindowStore();

    public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>><K, V>> sessionWithTimestampStoretimestampedSessionStore();
}


// test-utils package


public class TopologyTestDriver {
    public <K, V> KeyValueWithTimestampStore<KTimestampedKeyValueStore<K, V> getKeyValueWithTimestampStoregetTimestampedKeyValueStore(final String name);

    public <K, V> WindowWithTimestampStore<KTimestampedWindowStore<K, V> getWindowWithTimestampStoregetTimestampedWindowStore(final String name);

    public <K, V> SessionWithTimestampStore<KTimestampedSessionStore<K, V> getSessionWithTimestampStoregetTimestampedSessionStore(final String name);
}

New stores (RocksDB, InMemory) will be added that implement the corresponding new interfaces. Note, we keep the existing stores as-is and only add new stores that can be used by PAPI users too; by default, PAPI users would need to rewrite their application to switch from existing store usage to new stores if desired.

...

  • For locally stored data, reads will be served with surrogate timestamp -1 (semantics is unknown).
  • On put, data will be stored using the new format.
  • Key-Value store: new RocksDBWithTimestamp store is TimestampedRocksDDStore is used. To isolate old and new format, we use two column families. We perform dual put/get operation in new and old column family to lazily upgrade all data from old to new format.
  • Window/session store: existing segments will use old format and new segments will be created with the new format.

...

There is one special case for which users uses a custom XxxByteStoreSupplier. For this case the returned store wont' implement RecordConverter and won't understand the new format. Hence, we use a proxy store to remove timestamps on write and add surrogate timestamp -1 (semantics is unknown) on read (note, that for non-persistent custom stores, we don't need to use the proxy). Using the proxy is required to guarantee a backward compatible upgrade path. Users implementing a custom XxxByteStoreSupplier can opt-in and extend their stores to implement RecordConverter interface, too. For this case, we don't wrap the store with a proxy and it's the users responsibility to implement the custom store correctly to migrate from old to new format.

Compatibility, Deprecation, and Migration Plan

Simple rolling bounce upgrade is sufficient. For PAPI uses, nothing changes at all. For DSL users, the internal RocksDBWithTimestampStoreTimestampedRocksDBStore (as one example) will upgrade the store data lazily in the background. Only if users provide a custom XxxByteStoreSupplier no upgrade happens (but users can opt-in implementing RecordConverter interface) but the old format is kept. We use a proxy store that removes/adds timestamp on read/write.

To keep interactive queries feature compatible, the new XxxWithTimestampStores TimestampedXxxStores can be queried with and without timestamp. This is important for DSL users, that might query a store as KeyValueStore while the DSL switches the store internally to a KeyValueWithTimestampStoreTimestampedKeyValueStore. Thus, we allow to query a KeyValueWithTimestampStoreTimestampedKeyValueStore with queryable story type "key-value" and remove the timestamp under the hood on read. Compatibility for window/session stores are handled similarly.

The RecordConverter interface will be implemented by Kafka Streams for the newly added stores. Only users who implement custom key-value/window/session stores are not affected, as their stores will be wrapped by the proxy store. User can "opt-in" of course, and change their custom key-value/window/session store implementation to support the new TimestampedXxxStore interfaces by implementing RecordConverter in their own store classes. For other user implemented stores that add a new StateStore type, nothing changes for existing code. Again, user can now also implement RecordConverter interface, however, those store are "black boxes" to Kafka Streams anyway and cannot be used in DSL operators and thus it the user's responsibility to implement their stores correctly if they use RecordConverter interface. Note, that the RecordConverter interface can be used as a generic interface to translated the format of records stored in a changelog topic into a different format that is used by the state store.

Test Plan

This feature can be mainly tested via unit and integration tests:

...