Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • TimstampedKeyValueStore that extends the exiting KeyValueStore
  • TimstampedWindowStore that extends the exiting WindowStore
  • TimstampedSessionStore that extends the exiting exiting SessionStore

Those new stores will be used by DSL operators. For PAPI users, nothing changes for existing applications. Of course, the new stores can we used, too.

For a seamless single rolling bounce upgrade, we introduce a RecordConverter interfaceTimestampedBytesStore interface. This interface can is be used to transform binary data from old format into the new format. This is required for reading data from the changelog topic during restore – in our case, the concrete implementation will copy the timestamp from the record timestamp field into the value (upgrade details are discussed below). However, the RecordConverter interface could be used for other transformations and is a generic concept, that does not necessarily apply to upgrading storesby byte-stores, to indicate that they expect the new value+timestamp format. Additionally it provides a static helper method to convert byte[] arrays in old "plain value" format to new "value+timestamp format".

Proposed Changes

To make use of the new timestamp that can be stored, we need to add new interfaces class to the existing store interfaces that allow to read/write the timestamp and the value at once.

Code Block
languagejava
package org.apache.kafka.streams.state;

// new classed and interfaces

public interfaceclass ValueAndTimestamp<V> {
    V value();
    long timestamp();
}

public interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {
    void put(K key, V value, long timestamp);
    ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp);
}

public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {
    // note, we don't add `put(K key, V value, long timestamp)`
    void put(K key, V value, long timestamp, long windowStartTimestamp);
}

public interface TimestampedSessionStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> {
    void put(final Windowed<K> sessionKey, final AGG aggregate, final long timestamp);
}

public interface RecordConverterTimestampedBytesStore {
    ConsumerRecord<byte[],static byte[]> convertconvertToTimestampedFormat(final ConsumerRecord<byte[], byte[]> recordplainValue);
}

// extend existing classes (omitting existing method)

public final class Stores {
    public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                      final Serde<K> keySerde,
                                                                                                      final Serde<V> valueSerde);


    public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier,
                                                                                                  final Serde<K> keySerde,
                                                                                                  final Serde<V> valueSerde;

    public static <K, V> StoreBuilder<TimestampedSessionStore<K, V>> timestampedSessionStoreBuilder(final SessionBytesStoreSupplier supplier,
                                                                                                    final Serde<K> keySerde,
                                                                                                    final Serde<V> valueSerde);
}

public final class QueryableStoreTypes {
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> timestampedKeyValueStore();

    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>><K, V>> timestampedWindowStore();

    public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>><K, V>> timestampedSessionStore();
}


// test-utils package


public class TopologyTestDriver {
    public <K, V> TimestampedKeyValueStore<K, V> getTimestampedKeyValueStore(final String name);

    public <K, V> TimestampedWindowStore<K, V> getTimestampedWindowStore(final String name);

    public <K, V> TimestampedSessionStore<K, V> getTimestampedSessionStore(final String name);
}

...

  • For locally stored data, reads will be served with surrogate timestamp -1 (semantics is "unknown").
  • On put, data will be stored using the new format.
  • Key-Value store: new TimestampedRocksDDStoreTimestampedRocksDBStore is used. To isolate old and new format, we use two column families. We perform dual put/get operation in new and old column family to lazily upgrade all data from old to new format.
  • Window/session Session store: existing segments will use old format and new segments will be created with the new format.

The described upgrade path works for library provided store implementations out-of-the-box. All new stores implement RecordConverter interfaceTimestampedBytesStore interface, that indicates that the store can handle old and new format and does a seamless upgrade internally.

There is one special case for which users uses a custom XxxByteStoreSupplier. For this case the returned store wont' implement RecordConverterTimestampedBytesStore and won't understand the new format. Hence, we use a proxy store to remove timestamps on write and add surrogate timestamp -1 (semantics is "unknown") on read (note, that for non-persistent custom stores, we don't need to use the proxy). Using the proxy is required to guarantee a backward compatible upgrade path. Users implementing a custom XxxByteStoreSupplier can opt-in and extend their stores to implement RecordConverter interfaceTimestampedBytesStore interface, too. For this case, we don't wrap the store with a proxy and it's the users responsibility to implement the custom store correctly to migrate from old to new format. Users implementing a custom stores that implement TimestampedBytesStore and that want to upgrade existing data in old format to new format can use TimestampedBytesStores#convertToTimestampedFormat() method.

Compatibility, Deprecation, and Migration Plan

Simple rolling bounce upgrade is sufficient. For PAPI uses, nothing changes at all. For DSL users, the internal RocksDBTimestampedStore (as one example) will upgrade the store data lazily in the background. Only if users provide a custom XxxByteStoreSupplier no upgrade happens (but users can opt-in implementing RecordConverter interfaceimplementing TimestampedBytesStore interface) but the old format is kept. We use a proxy store that removes/adds timestamp on read/write.

To keep interactive queries feature compatible, the new TimestampedXxxStores can be queried with and without timestamp. This is important for DSL users, that might query a store as KeyValueStore while the DSL switches the store internally to a TimestampedKeyValueStore. Thus, we allow to query a TimestampedKeyValueStore with queryable story type "key-value" and remove the timestamp under the hood on read. Compatibility for window/session stores are handled similarly.

The RecordConverter interface TimestampedBytesStore interface will be implemented by Kafka Streams for the newly added stores. Only users who implement custom key-value/window/session stores are not affected, as their stores will be wrapped by the proxy store. User can "opt-in" of course, and change their custom key-value/window/session store implementation to support the new TimestampedXxxStore interfaces by implementing RecordConverterTimestampedBytesStore in their own store classes. For other user implemented stores that add a new StateStore type, nothing changes for existing code. Again, user can now also implement RecordConverter interface, however, those store are "black boxes" to Kafka Streams anyway and cannot be used in DSL operators and thus it the user's responsibility to implement their stores correctly if they use RecordConverter interface. Note, that the RecordConverter interface can be used as a generic interface to translated the format of records stored in a changelog topic into a different format that is used by the state store.

Test Plan

This feature can be mainly tested via unit and integration tests:

...