Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The challenging part of this KIP is to define a smooth upgrade path with the upgraded RocksDB format. There are some initial thoughts on KAFKA-3522 already. The rest of this doc will first focus on the public interface additions for the upgrade path. Storing timestamps in RocksDB is just a special case though, and we propose to allow for a generic upgrade path from any storage format A to any other storage format B. This include local storage in RocksDB as well as the underlying changelog topic.

Public Interfaces

We will add a new configuration parameter upgrade.mode that will be null by default and can take three values: "in_place" and "roll_over" with the following semantics:

  • null: no upgrade needed, run with latest formats
  • "in_place": prepare an in-place "standby" RocksDB with new format
  • "roll_over": prepare an roll-over "standby" RocksDB with new format

We add a new store type KeyValueWithTimstampStore that extends the exiting KeyValueStore.

We generalize the translation from a changelog ConsumerRecord into an store KeyValue pair using a new interface RecordConverter – there will be a default implementation that does a 1:1 mapping from key to key and value to value. For the new KeyValueWithTimestampStore, we implement a mapping from key to key and from `value plus timestamp` to value.

We introduce interface RecordConverterStore that allows to translate from old storage format to new storage format.

We introduce interface StoreUpgradeBuilder that extends StoreBuilder and can return a "store proxy" that maps from the new store API to an internally used old store and that can return a RecordConverteStore to the actual store upgrade. Thus, StoreUpgradeBuilder can return proxy, upgrade, and new store.

Proposed Changes

To make use of the new timestamp that can be stored, we need to add new interfaces to the existing store interfaces that allow to read/write the timestamp and the value at once.

Note that only DSL users will be affected, because we want to switch the default stores. 

Public Interfaces

We add a new store types:

  • KeyValueWithTimstampStore that extends the exiting KeyValueStore
  • WindowWithTimstampStore that extends the exiting WindowStore
  • SessionWithTimstampStore that extends the exiting SessionStore

Those new stores will be used by DSL operators. For PAPI users, nothing changes for existing applications. Of course, the new stores can we used, too.

For a seamless single rolling bounce upgrade, we introduce a RecordConverter interface. This interface can be used to transform binary data from old format into the new format. This is required for reading data from the changelog topic during restore – the timestamp must be copied from the record timestamp field into the value. Upgrade details are discussed below.

Proposed Changes

To make use of the new timestamp that can be stored, we need to add new interfaces to the existing store interfaces that allow to read/write the timestamp and the value at once.

Code Block
languagejava
package org.apache.kafka.streams.state;

// new interfaces

Code Block
languagejava
package org.apache.kafka.streams.state;

// new interfaces

public interface ValueAndTimestamp<V> {
    V value();
    long timestamp();
}


public interface KeyValueWithTimestampStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {
    void put(K key, V value, long timestamp);
    ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp);
}

//public TODO:interface addWindowWithTimestampStore<K, missingV> WindowWithTimestampStore and SessionWithTimestampStore

// extend existing classes (omitting existing method)

public final class Stores {extends WindowStore<K, ValueAndTimestamp<V>> {
    // note, we don't add `put(K key, V value, long timestamp)`
    publicvoid staticput(K <Kkey, V>V StoreBuilder<KeyValueWithTimestampStore<Kvalue, V>> keyValueWithTimestampStoreBuilder(final KeyValueBytesStoreSupplier supplier,
       long timestamp, long windowStartTimestamp);
}

public interface SessionWithTimestampStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> {
    void put(final Windowed<K> sessionKey, final AGG aggregate, final long timestamp);
}

public interface RecordConverter {
    ConsumerRecord<byte[], byte[]>             convert(final ConsumerRecord<byte[], byte[]> record);
}

// extend existing classes (omitting existing method)

public final class Stores {
    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueWithTimestampStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                     final Serde<K> keySerde,
                                                   final Serde<K> keySerde,
                                                     final Serde<V> valueSerde);

    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                       final Serde<V> valueSerde);


    public static <K, V> StoreBuilder<WindowWithTimestampStore<K, V>> windowWithTimestampStoreBuilder(final WindowBytesStoreSupplier supplier,
                                                                      final Serde<K> keySerde,
                              final Serde<K> keySerde,
                                                                                           final Serde<V> valueSerde);

    // TODO: add missing xxxStoreBuilder() methodsfinal for WindowWithTimestampStore and SessionWithTimestampStore
}

public final class QueryableStoreTypes {
    // verify if we need this, or if we can reuse keyValueStore() directly with V==ValueAndTimestamp<V>
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> keyValueWithTimestampStore();

    // TODO add missing methods for WindowWithTimestampStore and SessionWithTimestampStore (if required)
}

We extend our existing stores (RocksDB, InMemory) to implement the corresponding new interfaces. Note, we will keep the old stores and extend them to give PAPI users to choice to use stored with or without the ability to store timestamps.

The usage/implementation of upgrade stores is describe in the next section.

Upgrading

For a clean upgrade path for RocksDB itself, we need to introduce the above configs and new interfaces to implement actual upgrade code. We will provide implementations to upgrade from an existing KeyValueStore to and KeyValueWithTimestampStore. The defined interface are generic though, allowing to implement "upgrade stores" from any store type A to and other store type B.

To make the upgrade possible, we generalize the changelog-to-store mapping from ConsumerRecords to KeyValue pairs – this allows to customize the translation during store upgrade. To upgrade a store, a user updates her topology to use a `StoreUpgradeBuilder` that can return a proxy store, an upgrade store, and a regular store. If the upgrade mode is enabled, the runtime will create a proxy store for StreamTasks that implements the new store API but internally maps back to the old store (ie old on disk format). Using the proxy store allows to run new code with the already existing old store to avoid any downtime. At the same time, runtime will create StoreUpgradeTasks that use `RecordConverterStore`. For those StoreUpgradeTasks, the changelog topic will be consumer, record format will be update to the new format using `RecordConverter` interface; afterwards, the new format is written to (new) RocksDB (and back to the changelog topic if required). Because the changelog topic may contain data with different format, we encode the format in the record header and enhance all changelog readers to check the header for the correct version. If upgrade mode is disabled, `StoreUpgradeBuilder` is used as a regular store builder returning the new store – the difference to use the a builder or the new store directly is two fold: (1) using StoreUpgradeBuilder we can perform a final check if the upgrade finished. If not, we can first create no active tasks, but only StoreUpgradeTasks using an upgrade store. Only after the upgrade is finished, StoreUpgradeTasks are destroyed and regular task using the new stores are created. (2) After a successful update, users don't need to rewrite their code (of course, they still can rewrite the code after a successful upgrade and replace `StoreUpgradeBuilder` with a store builder for the new store only).

As mentioned, we generalize the changelog-to-store mapping from CosumerRecords to KeyValue pairs. If a store (like KeyValueWithTimestampStore) requires a non-default mapping, the corresponding `StateRestoreCallback` must implement `RecordConverter`, too.

Code Block
languagejava
package org.apache.kafka.streams.processor;

/**
 * {@code RecordConverter} translates a {@link ConsumerRecord} into a serialized value.
 */
public interface RecordConverter {

    /**
     * Convert a given record into a value for local storage.
     *
     * @param record the consumer record
     * @return the value for local storage
     */
    byte[] convert(final ConsumerRecord<byte[], byte[]> record);

}



/****************************************************************************************************/
package org.apache.kafka.streams.processor;

/**
 * {@code RecordConverterStore} translates a changelog record from old store format to new store format.
 */
public interface RecordConverterStore extends StateStore, RecordConverter {
}



/****************************************************************************************************/
package org.apache.kafka.streams.processor;

/**
 * {@code StoreUpgradeBuilder} that provides a store as well as corresponding proxy store and converter store.
 * <p>
 * The proxy store maps from a new store API to an old store API.
 * It is responsible to map from the new API calls to the old API during store upgrade phase.
 * <p>
 * The converter store is a store for the new format.
 * Additionally, it implements a {@link RecordConverter} that is used to translate from the old storage format to the new storage format.
 *
 * @param <S> store type (proxy store must implement this store type, too)
 * @param <C> new store type that additionally implements {@link RecordConverter}
 */
public interface StoreUpgradeBuilder<S extends StateStore, C extends RecordConverterStore> extends StoreBuilder<S> {

    /**
     * Return a new instance of the proxy store.
     * @return a new instance of the proxy store
     */
    S storeProxy();

    /**
     * Return a new instance of the converter store
     * @return a new instance of converter store
     */
    C converterStore();
}

For the actual RocksDB storage format for KeyValueWithTimestampStore, we add the record timestamp as a 8-byte (long) prefix to the value; ie, we change the format from <key:value> to <key:timestamp+value>. We need to introduce a new value serde that wraps the original value serde as well as a long serde. One important details is, that the serde only changes for the store, but not the changelog topic: the underlying changelog topic stores the timestamp in the record metadata timestamp field already. We need to intercept the write to the changelog topic accordingly.

For the actual upgrade, that might be "in place" we need to make sure to use different directories. Thus, StoreUpgradTasks create store directories with suffix _prepare, ie, for each active task with task directory `X_Y` a StoreUpgradeTask will use task directory `X_Y_prepare`. The directory isolation at task level ensures that we can reuse the same internal store directory structure for active and store-upgrade tasks. After the stores are restored, in a second rebalance, the old task directory will be renamed, the "prepare" directory will be renamed to act as new active task directory, and finally we delete the renamed original task directory to free up the disk space.

Compatibility, Deprecation, and Migration Plan

The storage format change requires application to upgrade correctly. We plan to offer two upgrade pathes.

  • In-place upgrade (online): this requires two rolling bounces of each application instance
    • advantage: simpler than roll-over upgrade
    • disadvantage: need 2x local disk storage during upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version; Kafka Streams need to be configured with upgrade.mode="in_place" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
      3. because upgrade mode is "in place", each instance will create a "store upgrade" task to each assigned active and standby task, that start to rebuild RocksDB stores in new format in parallel to existing RocksDB
        1. existing RocksDB will be used for processing so there is no down time
        2. each instance builds up the new RocksDB state for all its assigned active and standby tasks
        3. during restore, users need to observe restore progress (as actual processing resumes, restore will never the "finished", and it's up to the user when to trigger the second rolling bound)
      4. after all stores are prepared, user prepares a second round of rebalance; this time, the configuration parameter upgrade.mode must be removed for new startup
      5. do a second rolling bounce for each instance to get new config
        1. if prepare task directories are detected, we check if upgrade is finished and if not finish the upgrade (ie, read the latest delta from the changelog), the corresponding active directories are replace by the prepare task directories
        2. afterwards regular processing begins
  • In-place upgrade (offline): required single rolling bounce of each application instance
    • advantage: simplest way to upgrade (only one rolling bounce)
    • disadvantage: needs 2x local disk storage during upgrade; application is offline during store upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version; Kafka Streams configured does not need to be changed (ie, upgrade.mode=null for startup)
      2. do a rolling bounce to get the new jar in place for each instance
      3. because upgrade mode is disabled, each instance only find the old store format but no new store (thus it behave exactly the same as in second rolling bounce of "in_plase" online upgrade)
        1. create prepare task directory and reply the "latest delta" from the changelog topic (it happens, that this delta is the complete changelog topic)
        2. after restore finished (no processing during this time), old store is replaces with new store
        3. afterwards regular processing begins
  • Roll-over upgrade:
    • in-place upgrade might not be feasible because of its large local disk storage requirement; thus, roll-over is an alternative to enable upgrading if not enough local disk space is availalbe for a in-place upgrad
    • if an application is running with N instances, user starts N new instances in parallel (old and new form one consumer group); the new intances rebuild the RocksDB state with the new format; finally, the old instances are stopped
    • upgrade flow:
      1. prepare a jar hot swap from old version for all existing (old) instances; old Kafka Streams need to be configured with upgrade.mode=null for startup
      2. do a rolling bounce to get the new jar for each old instance
        1. all old instances will just resume processing as usual
        2. because upgrade mode is not enabled no store upgrade tasks are started (cf. "In-place" upgrade above)
      3. users starts N new instances with config parameter upgrade.mode="roll_over"
        1. The `roll_over_new` config will be encoded in the subscription metadata
        2. the leader can distinguish between old and new instances based on the used Subscription encoded information and assign tasks (active and restore) for restore to the "roll over" instances (ie, the assignment is "mirrored" to all "roll over" instances)
        3. "roll over" instances only create StoreUpgradeTasks and perform the restore
      4. all old instances are stopped
        1. as long as at least one old instance is alive, the leader assign all aktive tasks to the old instances only
        2. however, as those are shut down in a row, idealy a rebalance will only trigger after all are down already (note, Kafka Streams does not send a "leave group request" and thus a rebalance only happens after session.timeout passes)
        3. when all old instances are down, the leader only receives subscriptions that encode "roll over" and thus the leader knows that the upgrade is completed

Upgrade scenarios:

Processor API:

  • using KeyValueStore/WindowStore/SessionStore and wants to keep it
    • nothing to do; regular single rolling bounce upgrade
  • using KeyValueStore/WindowStore/SesssionStore and want to upgrade to XxxWithTimestampStore
    • update code to use new store and provide corresponding `StoreUpgradeBuilder`
    • follow instructions as described above

DSL users (after KAFKA-6521 is implemented):

  • by default, users should consider the three available upgrade path
    • if they don't pay attention, they would end up with an in-place offline upgrade
  • if users overwrite all used stores to in-memory, an online in-place upgrade does not make sense
    • we recommend an in-place offline upgrade (we think this is fine, as users are fine with larger restore time anyway, as they use in-memory stores)
    • roll-over upgrade would also be possible

Test Plan

  • unit and integration tests for the new embedded timestamp feature
    • tests that insure the timestamps are written and read correctly and that records are (de)serialized correctly
    • this includes reading source KTables, aggregation operations that result in KTables, and all other operators that might force a KTable materialization
    • KTable recovery/restore must be tested.
  • unit and integration tests for StreamPartitionsAssigner to react correctly to configs and received subscription
  • system tests that perform rolling bounce upgrades as described above
    • this should include failure scenario during the upgrade
    • this should include "simulated upgrades" to metadata version 4, to ensure that the implementation work correct for future changes

...

Serde<V> valueSerde;

    public static <K, V> StoreBuilder<SessionWithTimestampsStore<K, V>> sessionWithTimestampStoreBuilder(final SessionBytesStoreSupplier supplier,
                                                                                                         final Serde<K> keySerde,
                                                                                                         final Serde<V> valueSerde);
}

public final class QueryableStoreTypes {
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> keyValueWithTimestampStore();

    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>><K, V>> windowWithTimestampStore();

    public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>><K, V>> sessionWithTimestampStore();
}

New stores (RocksDB, InMemory) will be added that implement the corresponding new interfaces. Note, we keep the existing stores as-is and only add new stores that can be used by PAPI users too; by default, PAPI users would need to rewrite their application to switch from existing store usage to new stores if desired.

All users upgrade with a single rolling bounce per instance.

For hide the format change for DSL user, the new stores will handle the format change internally. We have to distinguish in-memory and persistent stores:

In-Memory stores:

  • on restart, data will be read from the changelog topic
  • the changelog format does not change
  • the record timestamp can be extracted and put into the value on read (RecordConverter will take care of this part)

Persistent stores:

  • For locally stored data, reads will be served with surrogate timestamp -1 (semantics is unknown).
  • On put, data will be stored using the new format.
  • Key-Value store: new RocksDBWithTimestamp store is uses. To isolate old and new format, we use two column families. We perform dual put/get operation in new and old column family to lazily upgrade all data from old to new format.
  • Window/session store: existing segments will use old format and new segments will be created with the new format.

The described upgrade path works for library provided store implementations out-of-the-box. All new stores implement RecordConverter interface, that indicates that the store can handle old and new format and does a seamless upgrade internally.

There is one special case for which users uses a custom XxxByteStoreSupplier. For this case the returned store wont' implement RecordConverter and won't understand the new format. Hence, we use a proxy store to remove timestamps on write and add surrogate timestamp -1 (semantics is unknown) on read (note, that for non-persistent custom stores, we don't need to use the proxy). Using the proxy is required to guarantee a backward compatible upgrade path. Users implementing a custom XxxByteStoreSupplier can opt-in and extend their stores to implement RecordConverter interface, too. For this case, we don't wrap the store with a proxy and it's the users responsibility to implement the custom store correctly.

Compatibility, Deprecation, and Migration Plan

Simple rolling bounce upgrade is sufficient. For PAPI uses, nothing changes at all. For DSL users, the internal RocksDBWithTimestampStore (as one example) will upgrade the store data lazily in the background. Only if users provide a custom XxxByteStoreSupplier no upgrade happens (but users can opt-in implementing RecordConverter interface) but the old format is kept. We use a proxy store that removes/add timestamp on read/write.

Test Plan

This feature can be mainly tested via unit and integration tests:

  • unit/integration tests for newly added stores
  • Upgrades can be simulated in integration tests by combining PAPI and DSL in one test run

Additionally, existing system tests are extended to perform rolling bounce upgrades.

Rejected Alternatives

  • background upgrade (ie, use existing store and migrate to new format in the background; switch over to new format later)
    • requires complex upgrade path with in-place/roll-over configuration and two rolling bounces

...

  • change the RocksDB on-disk format and encode the used serialization version per record (this would simplify future upgrades). However there are main disadvantages:
    • storage amplification for local stores
    • record version could get stored in record headers in changelog topics -> changelog topic might never overwrite record with older format
    • code needs to check all versions all the time for future release: increases code complexity and runtime overhead
    • it's hard to change the key format
      • for value format, the version number can be a magic prefix byte
      • for key lookup, we would need to know the magic byte in advance for efficient point queries into RocksDB; if multiple versions exist in parallel, this is difficult (either do multiple queries with different versions bytes until entry is found or all versions are tried implying does not exist – or use range queries but those are very expensive)
  • encode the storage format in the directory name not at "store version number" but at "AK release number"
    • might be confusion to user if store format does not change ("I am running Kafka 1.4, but the store indicates it's running on 1.2").
  • use a simpler offline upgrade path without any configs or complex rolling bounce scenarios
    • requires application down-time for upgrading to new format
  • use consumer's built-in protocol upgrade mechanism (ie, register multiple "assignment strategies")
    • has the disadvantage that we need to implement two StreamsPartitionAssingor classes
    • increased network traffic during rebalance
    • encoding "supported version" in metadata subsumes this approach for future releases anyway
    • if we want to "disable" the old protocol, a second rebalance is required, too
    • cannot avoid a second rebalance that this required for state store upgrade
  • only support in-place upgrade path instead of two to simplify the process for users (don't need to pick)
    • might be prohibitive if not enough disk space is available
  • allow DSL users to stay with old format: upgrade would be simpler as it's only one rolling bounce
    • unclear default behavior: should we stay on 12.1 format by default or should we use 12.2 format by default?
      • if 12.1 is default, upgrade is simple, but if one write a new application, users must turn on 12.2 format explicitly
      • if 12.2 is default, simple upgrade requires a config that tells Streams to stay with 12.1 format
      • conclusion: upgrading and not upgrading is not straight forward either way, thus, just force upgrade
    • if no upgrade happens, new features (as listed above) would be useless
  • Only prepare stores for active task (but not standby tasks)
    • this would reduce the disk footprint during upgrade
    • disadvantage: when switch to new version happens, there are not hot standby available for some time
    • we could make it configurable, however, we try to keep the number of configs small; also, it's already complex enough and adding more options would make it worse
    • it's not an issue for roll-over upgrade and not everybody configures Standbys in the frist first place
      • people with standbys are willing to provide more disk, so it seem a fair assumption that they are fine with roll-over upgrade, too

...