THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.state; // new classed and interfaces public class ValueAndTimestamp<V> { public V value(); public long timestamp(); } public interface TimestampedKeyValueStore<K, V>public extendsstatic KeyValueStore<K,<V> ValueAndTimestamp<V>> {} public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {} public interface TimestampedSessionStore<K, V> extends SessionStore<KValueAndTimestamp<V> make(final V value, final long timestamp); // returns `null` if `value==null` public static <V> V getValueOrNull(final ValueAndTimestamp<V> valueAndTimestamp); // returns `null` if `valueAndTimestamp==null` } public interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {} public interface TimestampedBytesStore { static byte[] convertToTimestampedFormat(final byte[] plainValue); } // extend existing classes (omitting existing method) public final class Stores {TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {} public interface TimestampedSessionStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> {} public interface TimestampedBytesStore { static byte[] convertToTimestampedFormat(final byte[] plainValue); } // extend existing classes (omitting existing method) public final class Stores { public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name); public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates); public static SessionBytesStoreSupplier persistentTimestampedSessionStore(final String name, final Duration retentionPeriod); public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde); public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde; public static <K, V> StoreBuilder<TimestampedSessionStore<K, V>> timestampedSessionStoreBuilder(final SessionBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde); } public final class QueryableStoreTypes { public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>>ValueAndTimestamp<V>>> timestampedKeyValueStore(); public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>><K, V>>ValueAndTimestamp<V>>> timestampedWindowStore(); public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>><K,ValueAndTimestamp<V>>> V>> timestampedSessionStore(); } // test-utils package public class TopologyTestDriver { public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name); public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(final String name); public <K, V> SessionStore<K, ValueAndTimestamp<V>> getTimestampedSessionStore(final String name); } |
...
- background upgrade (ie, use existing store and migrate to new format in the background; switch over to new format later)
- requires complex upgrade path with in-place/roll-over configuration and two rolling bounces
- change the RocksDB on-disk format and encode the used serialization version per record (this would simplify future upgrades). However there are main disadvantages:
- storage amplification for local stores
- record version could get stored in record headers in changelog topics -> changelog topic might never overwrite record with older format
- code needs to check all versions all the time for future release: increases code complexity and runtime overhead
- it's hard to change the key format
- for value format, the version number can be a magic prefix byte
- for key lookup, we would need to know the magic byte in advance for efficient point queries into RocksDB; if multiple versions exist in parallel, this is difficult (either do multiple queries with different versions bytes until entry is found or all versions are tried implying does not exist – or use range queries but those are very expensive)
- encode the storage format in the directory name not at "store version number" but at "AK release number"
- might be confusion to user if store format does not change ("I am running Kafka 1.4, but the store indicates it's running on 1.2").
- use a simpler offline upgrade path without any configs or complex rolling bounce scenarios
- requires application down-time for upgrading to new format
- use consumer's built-in protocol upgrade mechanism (ie, register multiple "assignment strategies")
- has the disadvantage that we need to implement two
StreamsPartitionAssingor
classes - increased network traffic during rebalance
- encoding "supported version" in metadata subsumes this approach for future releases anyway
- if we want to "disable" the old protocol, a second rebalance is required, too
- cannot avoid a second rebalance that this required for state store upgrade
- has the disadvantage that we need to implement two
- only support in-place upgrade path instead of two to simplify the process for users (don't need to pick)
- might be prohibitive if not enough disk space is available
- allow DSL users to stay with old format: upgrade would be simpler as it's only one rolling bounce
- unclear default behavior: should we stay on 2.1 format by default or should we use 2.2 format by default?
- if 2.1 is default, upgrade is simple, but if one write a new application, users must turn on 2.2 format explicitly
- if 2.2 is default, simple upgrade requires a config that tells Streams to stay with 2.1 format
- conclusion: upgrading and not upgrading is not straight forward either way, thus, just force upgrade
- if no upgrade happens, new features (as listed above) would be useless
- unclear default behavior: should we stay on 2.1 format by default or should we use 2.2 format by default?
- Only prepare stores for active task (but not standby tasks)
- this would reduce the disk footprint during upgrade
- disadvantage: when switch to new version happens, there are not hot standby available for some time
- we could make it configurable, however, we try to keep the number of configs small; also, it's already complex enough and adding more options would make it worse
- it's not an issue for roll-over upgrade and not everybody configures Standbys in the first place
- people with standbys are willing to provide more disk, so it seem a fair assumption that they are fine with roll-over upgrade, too
...