...
TimstampedKeyValueStore
that extends the exitingKeyValueStore
TimstampedWindowStore
that extends the exitingWindowStore
that extends the exiting exitingTimstampedSessionStore
SessionStore
Those new stores will be used by DSL operators. For PAPI users, nothing changes for existing applications. Of course, the new stores can we used, too.
For a seamless single rolling bounce upgrade, we introduce a RecordConverter
interfaceTimestampedBytesStore
interface. This interface can is be used to transform binary data from old format into the new format. This is required for reading data from the changelog topic during restore – in our case, the concrete implementation will copy the timestamp from the record timestamp field into the value (upgrade details are discussed below). However, the RecordConverter
interface could be used for other transformations and is a generic concept, that does not necessarily apply to upgrading storesby byte-stores, to indicate that they expect the new value+timestamp format. Additionally it provides a static helper method to convert byte[]
arrays in old "plain value" format to new "value+timestamp format".
Proposed Changes
To make use of the new timestamp that can be stored, we need to add new interfaces class to the existing store interfaces that allow to read/write the timestamp and the value at once.
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.state; // new classed and interfaces public interfaceclass ValueAndTimestamp<V> { V value(); long timestamp(); } public interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> { void put(K key, V value, long timestamp); ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp); } public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> { // note, we don't add `put(K key, V value, long timestamp)` void put(K key, V value, long timestamp, long windowStartTimestamp); } public interface TimestampedSessionStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> { void put(final Windowed<K> sessionKey, final AGG aggregate, final long timestamp); } public interface RecordConverterTimestampedBytesStore { ConsumerRecord<byte[],static byte[]> convertconvertToTimestampedFormat(final ConsumerRecord<byte[], byte[]> recordplainValue); } // extend existing classes (omitting existing method) public final class Stores { public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde); public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde; public static <K, V> StoreBuilder<TimestampedSessionStore<K, V>> timestampedSessionStoreBuilder(final SessionBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde); } public final class QueryableStoreTypes { public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> timestampedKeyValueStore(); public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>><K, V>> timestampedWindowStore(); public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>><K, V>> timestampedSessionStore(); } // test-utils package public class TopologyTestDriver { public <K, V> TimestampedKeyValueStore<K, V> getTimestampedKeyValueStore(final String name); public <K, V> TimestampedWindowStore<K, V> getTimestampedWindowStore(final String name); public <K, V> TimestampedSessionStore<K, V> getTimestampedSessionStore(final String name); } |
...
- For locally stored data, reads will be served with surrogate timestamp -1 (semantics is "unknown").
- On put, data will be stored using the new format.
- Key-Value store: new
TimestampedRocksDDStore
TimestampedRocksDBStore
is used. To isolate old and new format, we use two column families. We perform dual put/get operation in new and old column family to lazily upgrade all data from old to new format. - Window/session Session store: existing segments will use old format and new segments will be created with the new format.
The described upgrade path works for library provided store implementations out-of-the-box. All new stores implement RecordConverter
interfaceTimestampedBytesStore
interface, that indicates that the store can handle old and new format and does a seamless upgrade internally.
There is one special case for which users uses a custom XxxByteStoreSupplier
. For this case the returned store wont' implement RecordConverter
TimestampedBytesStore
and won't understand the new format. Hence, we use a proxy store to remove timestamps on write and add surrogate timestamp -1 (semantics is "unknown") on read (note, that for non-persistent custom stores, we don't need to use the proxy). Using the proxy is required to guarantee a backward compatible upgrade path. Users implementing a custom XxxByteStoreSupplier
can opt-in and extend their stores to implement RecordConverter
interfaceTimestampedBytesStore
interface, too. For this case, we don't wrap the store with a proxy and it's the users responsibility to implement the custom store correctly to migrate from old to new format. Users implementing a custom stores that implement TimestampedBytesStore
and that want to upgrade existing data in old format to new format can use TimestampedBytesStores#convertToTimestampedFormat()
method.
Compatibility, Deprecation, and Migration Plan
Simple rolling bounce upgrade is sufficient. For PAPI uses, nothing changes at all. For DSL users, the internal RocksDBTimestampedStore
(as one example) will upgrade the store data lazily in the background. Only if users provide a custom XxxByteStoreSupplier
no upgrade happens (but users can opt-in implementing RecordConverter
interfaceimplementing TimestampedBytesStore
interface) but the old format is kept. We use a proxy store that removes/adds timestamp on read/write.
To keep interactive queries feature compatible, the new TimestampedXxxStores
can be queried with and without timestamp. This is important for DSL users, that might query a store as KeyValueStore
while the DSL switches the store internally to a TimestampedKeyValueStore
. Thus, we allow to query a TimestampedKeyValueStore
with queryable story type "key-value" and remove the timestamp under the hood on read. Compatibility for window/session stores are handled similarly.
The RecordConverter
interface TimestampedBytesStore
interface will be implemented by Kafka Streams for the newly added stores. Only users who implement custom key-value/window/session stores are not affected, as their stores will be wrapped by the proxy store. User can "opt-in" of course, and change their custom key-value/window/session store implementation to support the new TimestampedXxxStore
interfaces by implementing RecordConverter
TimestampedBytesStore in their own store classes. For other user implemented stores that add a new StateStore
type, nothing changes for existing code. Again, user can now also implement RecordConverter
interface, however, those store are "black boxes" to Kafka Streams anyway and cannot be used in DSL operators and thus it the user's responsibility to implement their stores correctly if they use RecordConverter
interface. Note, that the RecordConverter
interface can be used as a generic interface to translated the format of records stored in a changelog topic into a different format that is used by the state store.
Test Plan
This feature can be mainly tested via unit and integration tests:
...