Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams.state;

// new interfaces

public interface ValueAndTimestamp<V> {
    V value();
    long timestamp();
}


public interface ReadOnlyKeyValueWithTimestampStore<K, V> extends ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> {}

public interface KeyValueWithTimestampStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {
    void put(K key, V value, long timestamp);
    ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp);
}

// extend existing classes (omitting existing method)

public final class Stores {
    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueWithTimestampStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                          final Serde<K> keySerde,
                                                                                                          final Serde<V> valueSerde);

    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                                           final Serde<K> keySerde,
                                                                                                                           final Serde<V> valueSerde);
}

public final class QueryableStoreTypes {
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueWithTimestampStore<KQueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> keyValueWithTimestampStore();
}

...

To make the upgrade possible, we generalize the changelog-to-store mapping from ConsumerRecords to KeyValue pairs – this allows to customize the translation during store upgrade. To upgrade a store, a user updates her topology to use a `StoreUpgradeBuilder` that can return a proxy store, an upgrade store, and a regular store. If the upgrade mode is enabled, the runtime will create a proxy story for store for StreamTasks that implements the new store API but internally maps back to the old store (ie old on disk format). Using the proxy store allows to run new code with the already existing old store to avoid any downtime. At the same time, runtime will create StoreUpgradeTasks that use `RecordConverterStore`. For those StoreUpgradeTasks, the changelog topic will be consumer, record format will be update to the new format using `RecordConverter` interface; afterwards, the new format is written to (new) RocksDB (and back to the changelog topic if required). Because the changelog topic may contain data with different format, we encode the format in the record header and enhance all changelog readers to check the header for the correct version. If upgrade mode is disabled, `StoreUpgradeBuilder` is used as a regular store builder returning the new store – the difference to use the a builder for or the new store directly is two fold: (1) using StoreUpgradeBuilder we can perform a final check if the upgrade finished. If not, we can first create no active tasks, but only StoreUpgradeTasks using an upgrade store. Only after the upgrade is finished, StoreUpgradeTasks are destroyed and regular task using the new stores are created. (2) After a successful update, users don't need to rewrite their code (of course, they still can rewrite the code after a successful upgrade and replace `StoreUpgradeBuilder` with a store builder for the new store only).

...

Code Block
languagejava
package org.apache.kafka.streams.processor;

/**
 * {@code RecordConverter} translates a {@link ConsumerRecord} into a {@link KeyValue} pairserialized value.
 */
public interface RecordConverter {

    /**
     * Convert a given record into a key-value pairfor local storage.
     *
     * @param record the consumer record
     * @return the recordvalue asfor key-valuelocal pairstorage
     */
    KeyValue<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record);

}



/****************************************************************************************************/
package org.apache.kafka.streams.processor;

/**
 * {@code RecordConverterStore} translates a changelog record from old store format to new store format.
 */
public interface RecordConverterStore extends StateStore, RecordConverter {
}



/****************************************************************************************************/
package org.apache.kafka.streams.processor;

/**
 * {@code StoreUpgradeBuilder} that provides a store as well as corresponding proxy store and converter store.
 * <p>
 * The proxy store maps from a new store API to an old store API.
 * It is responsible to map from the new API calls to the old API during store upgrade phase.
 * <p>
 * The converter store is a store for the new format.
 * Additionally, it implements a {@link RecordConverter} that is used to translate from the old storage format to the new storage format.
 *
 * @param <S> store type (proxy store must implement this store type, too)
 * @param <C> new store type that additionally implements {@link RecordConverter}
 */
public interface StoreUpgradeBuilder<S extends StateStore, C extends RecordConverterStore> extends StoreBuilder<S> {

    /**
     * Return a new instance of the proxy store.
     * @return a new instance of the proxy store
     */
    S storeProxy();

    /**
     * Return a new instance of the converter store
     * @return a new instance of converter store
     */
    C converterStore();
}

...

The storage format change requires application to upgrade correctly. We plan to offer two upgrade pathes.

  • In-place upgrade (online): this requires two rolling bounces of each application instance
    • advantage: simpler than roll-over upgrade
    • disadvantage: need 2x local disk storage during upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version; Kafka Streams need to be configured with upgrade.mode="in_place" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
      3. because upgrade mode is "in place", each instance will create a "store upgrade" task to each assigned active and standby task, that start to rebuild RocksDB stores in new format in parallel to existing RocksDB
        1. existing RocksDB will be used for processing so there is no down time
        2. each instance builds up the new RocksDB state for all its assigned active and standby tasks
        3. during restore, users need to observe restore progress (as actual processing resumes, restore will never the "finished", and it's up to the user when to trigger the second rolling bound)
      4. after all stores are prepared, user prepares a second round of rebalance; this time, the configuration parameter upgrade.mode must be removed for new startup
      5. do a second rolling bounce for each instance to get new config
        1. if prepare task directories are detected, we check if upgrade is finished and if not finish the upgrade (ie, read the latest delta from the changelog), the corresponding active directories are replace by the prepare task directories
        2. afterwards regular processing begins
  • RollIn-over place upgrade (offline): required single rolling bounce of each application instance
    • advantage: simplest way to upgrade (only one rolling bounce)
    • disadvantage: needs 2x local disk storage during upgrade; application is offline during store upgrade
    • in-place upgrade might not be feasible because of its large local disk storage requirement; thus, roll-over is an alternative to enable upgrading if not enough local disk space is availalbe for a in-place upgrad
    • if an application is running with N instances, user starts N new instances in parallel (old and new form one consumer group); the new intances rebuild the RocksDB state with the new format; finally, the old instances are stopped
    • upgrade flow:
      1. prepare a jar hot swap from old version for all existing (old) instances; old Kafka Streams configured does not need to be configured with changed (ie, upgrade.mode="null" for startup)
      2. do a rolling bounce to get the new jar in place for each old instance
      3. all old instances will just resume processing as usual
      4. because upgrade mode is not enabled no store upgrade tasks are started (cf. "In-place" upgrade above)
      5. users starts N new instances with config parameter upgrade.mode="roll_over"
        1. The `roll_over_new` config will be encoded in the subscription metadata
        2. the leader can distinguish between old and new instances based on the used Subscription encoded information and assign tasks (active and restore) for restore to the "roll over" instances (ie, the assignment is "mirrored" to all "roll over" instances)
        3. "roll over" instances only create StoreUpgradeTasks and perform the restore
      6. all old instances are stopped
      7. as long as at least one old instance is alive, the leader assign all aktive tasks to the old instances only
      8. however, as those are shut down in a row, idealy a rebalance will only trigger after all are down already (note, Kafka Streams does not send a "leave group request" and thus a rebalance only happens after session.timeout passes)
      9. when all old instances are down, the leader only receives subscriptions that encode "roll over" and thus the leader knows that the upgrade is completed
      10. disabled, each instance only find the old store format but no new store (thus it behave exactly the same as in second rolling bounce of "in_plase" online upgrade)
        1. create prepare task directory and reply the "latest delta" from the changelog topic (it happens, that this delta is the complete changelog topic)
        2. after restore finished (no processing during this time), old store is replaces with new store
        3. afterwards regular processing begins
  • Roll-over upgrade:
    • in-place upgrade might not be feasible because of its large local disk storage requirement; thus, roll-over is an alternative to enable upgrading if not enough local disk space is availalbe for a in-place upgrad
    • if an application is running with N instances, user starts N new instances in parallel (old and new form one consumer group); the new intances rebuild the RocksDB state with the new format; finally, the old instances are stopped
    • upgrade flow:
      1. prepare a jar hot swap from old version for all existing (old) instances; old Kafka Streams need to be configured with upgrade.mode=null for startup
      2. do a rolling bounce to get the new jar for each old instance
        1. all old instances will just resume processing as usual
        2. because upgrade mode is not enabled no store upgrade tasks are started (cf. "In-place" upgrade above)
      3. users starts N new instances with config parameter upgrade.mode="roll_over"
        1. The `roll_over_new` config will be encoded in the subscription metadata
        2. the leader can distinguish between old and new instances based on the used Subscription encoded information and assign tasks (active and restore) for restore to the "roll over" instances (ie, the assignment is "mirrored" to all "roll over" instances)
        3. "roll over" instances only create StoreUpgradeTasks and perform the restore
      4. all old instances are stopped
        1. as long as at least one old instance is alive, the leader assign all aktive tasks to the old instances only
        2. however, as those are shut down in a row, idealy a rebalance will only trigger after all are down already (note, Kafka Streams does not send a "leave group request" and thus a rebalance only happens after session.timeout passes)
        3. when all old instances are down, the leader only receives subscriptions that encode "roll over" and thus the leader knows that the upgrade is completed

Upgrade scenarios:

Processor API:

  • using KeyValueStore and does not want to keep it
    • nothing to do; regular single rolling bounce upgrade
  • using KeyValueStore and want to upgrade to KeyValueWithTimestamp store
    • update code to use new store and provide corresponding `StoreUpgradeBuilder`
    • follow instructions as described above

DSL users (we will do another KIP to use RocksDB because we need to add new API and deprecate existing API to switch the store type):

  • users running with default stores only
    • no code change required
    • all three upgrade modes as describe above possible – user must set config accordingly
  • users replacing all RocksDBs with In-Memory stores
    • code change to In-Memory-with-Timestamp store recommended
    • to not break compatibility, we would also support an upgrade without code change (for this case, the current/old behavior would be preserved, and an upgrade is effectively suppressed)
    • single rolling bounce upgrade with not upgrade config sufficient
    • roll-over upgrade possible
  • users providing `Materialized` parameters for RocksDB (note, if we leverage RocksDB with timestamps, the API changes from KeyValueStore to KeyValueWithTimestampStore)
    • code change to XXX-with-Timestamp store recommended
    • to not break compatibility, we would also support an upgrade without code change (for this case, the current/old behavior would be preserved, and an upgrade is effectively suppressed)
    • if no code change is made, single rolling bounce upgrade with not upgrade config sufficient
    • if code is changes, all three upgrade modes are available as described above
  • note: users with "mixed" code usage, that have at least one default store should consider to follow a two-bounce upgrade
    • if not, in worst case the perform an in-place offline upgrade

Test Plan

  • unit and integration tests for the new embedded timestamp feature
    • tests that insure the timestamps are written and read correctly and that records are (de)serialized correctly
    • this includes reading source KTables, aggregation operations that result in KTables, and all other operators that might force a KTable materialization
    • KTable recovery/restore must be tested.
  • unit and integration tests for StreamPartitionsAssigner to react correctly to configs and received subscription
  • system tests that perform rolling bounce upgrades as described above
    • this should include failure scenario during the upgrade
    • this should include "simulated upgrades" to metadata version 4, to ensure that the implementation work correct for future changes

...