Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Code Block
package org.apache.kafka.streams.state;

// new classed and interfaces

public class ValueAndTimestamp<V> {
    public V value();
    public long timestamp();

public  interface TimestampedKeyValueStore<K, V>public extendsstatic KeyValueStore<K,<V> ValueAndTimestamp<V>> {}

public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {}

public interface TimestampedSessionStore<K, V> extends SessionStore<KValueAndTimestamp<V> make(final V value, final long timestamp); // returns `null` if `value==null`
    public static <V> V getValueOrNull(final ValueAndTimestamp<V> valueAndTimestamp); // returns `null` if `valueAndTimestamp==null`

public interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {}

public interface TimestampedBytesStore {
    static byte[] convertToTimestampedFormat(final byte[] plainValue);

// extend existing classes (omitting existing method)

public final class Stores {TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {}

public interface TimestampedSessionStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> {}

public interface TimestampedBytesStore {
    static byte[] convertToTimestampedFormat(final byte[] plainValue);

// extend existing classes (omitting existing method)

public final class Stores {
    public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name);

    public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
                                                                            final Duration retentionPeriod,
                                                                            final Duration windowSize,
                                                                            final boolean retainDuplicates);

    public static SessionBytesStoreSupplier persistentTimestampedSessionStore(final String name,
                                                                              final Duration retentionPeriod);

    public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                      final Serde<K> keySerde,
                                                                                                      final Serde<V> valueSerde);

    public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier,
                                                                                                  final Serde<K> keySerde,
                                                                                                  final Serde<V> valueSerde;

    public static <K, V> StoreBuilder<TimestampedSessionStore<K, V>> timestampedSessionStoreBuilder(final SessionBytesStoreSupplier supplier,
                                                                                                    final Serde<K> keySerde,
                                                                                                    final Serde<V> valueSerde);

public final class QueryableStoreTypes {
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>>ValueAndTimestamp<V>>> timestampedKeyValueStore();

    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>><K, V>>ValueAndTimestamp<V>>> timestampedWindowStore();

    public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>><K,ValueAndTimestamp<V>>> V>> timestampedSessionStore();

// test-utils package

public class TopologyTestDriver {
    public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name);

    public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(final String name);

    public <K, V> SessionStore<K, ValueAndTimestamp<V>> getTimestampedSessionStore(final String name);


  • background upgrade (ie, use existing store and migrate to new format in the background; switch over to new format later)
    • requires complex upgrade path with in-place/roll-over configuration and two rolling bounces
  • change the RocksDB on-disk format and encode the used serialization version per record (this would simplify future upgrades). However there are main disadvantages:
    • storage amplification for local stores
    • record version could get stored in record headers in changelog topics -> changelog topic might never overwrite record with older format
    • code needs to check all versions all the time for future release: increases code complexity and runtime overhead
    • it's hard to change the key format
      • for value format, the version number can be a magic prefix byte
      • for key lookup, we would need to know the magic byte in advance for efficient point queries into RocksDB; if multiple versions exist in parallel, this is difficult (either do multiple queries with different versions bytes until entry is found or all versions are tried implying does not exist – or use range queries but those are very expensive)
  • encode the storage format in the directory name not at "store version number" but at "AK release number"
    • might be confusion to user if store format does not change ("I am running Kafka 1.4, but the store indicates it's running on 1.2").
  • use a simpler offline upgrade path without any configs or complex rolling bounce scenarios
    • requires application down-time for upgrading to new format
  • use consumer's built-in protocol upgrade mechanism (ie, register multiple "assignment strategies")
    • has the disadvantage that we need to implement two StreamsPartitionAssingor classes
    • increased network traffic during rebalance
    • encoding "supported version" in metadata subsumes this approach for future releases anyway
    • if we want to "disable" the old protocol, a second rebalance is required, too
    • cannot avoid a second rebalance that this required for state store upgrade
  • only support in-place upgrade path instead of two to simplify the process for users (don't need to pick)
    • might be prohibitive if not enough disk space is available
  • allow DSL users to stay with old format: upgrade would be simpler as it's only one rolling bounce
    • unclear default behavior: should we stay on 2.1 format by default or should we use 2.2 format by default?
      • if 2.1 is default, upgrade is simple, but if one write a new application, users must turn on 2.2 format explicitly
      • if 2.2 is default, simple upgrade requires a config that tells Streams to stay with 2.1 format
      • conclusion: upgrading and not upgrading is not straight forward either way, thus, just force upgrade
    • if no upgrade happens, new features (as listed above) would be useless
  • Only prepare stores for active task (but not standby tasks)
    • this would reduce the disk footprint during upgrade
    • disadvantage: when switch to new version happens, there are not hot standby available for some time
    • we could make it configurable, however, we try to keep the number of configs small; also, it's already complex enough and adding more options would make it worse
    • it's not an issue for roll-over upgrade and not everybody configures Standbys in the first place
      • people with standbys are willing to provide more disk, so it seem a fair assumption that they are fine with roll-over upgrade, too
