You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

Status

Current state"Under Discussion"

Discussion thread[DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

JIRA

key summary type created updated due assignee reporter priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured

Released: 1.2

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In order to improve the provided stream processing semantics of – and to add new feature to – Kafka Streams, we want to be able to store record timestamps in KTables. This allows us to address multiple issues like

  • handling out-of-order data for source KTable (related to KAFKA-5533)
  • add TTL for KTables (KAFKA-4212 and KAFKA-4273)
  • return the timestamp of the latest update in Interactive Queries (KAFKA-4304)
  • improve timestamp propagation for DSL operator (KAFKA-6455)

The challenging part of this KIP is to define a smooth upgrade path with the upgraded RocksDB format. There are some initial thoughts on KAFKA-3522 already. The rest of this doc will first focus on the public interface additions for the upgrade path. Storing timestamps in RocksDB is just a special case though, and we propose to allow for a generic upgrade path from any storage format A to any other storage format B. This include local storage in RocksDB as well as the underlying changelog topic.

Public Interfaces

We will add a new configuration parameter upgrade.mode that will be null by default and can take two values: "in_place" and "roll_over" with the following semantics:

  • null: no upgrade needed, run with latest formats
  • "in_place": prepare an in-place "standby" RocksDB with new format
  • "roll_over": prepare yourself for an upgrade of the rebalance user-metadata format and an roll-over upgrade of RocksDB on-disk format

We add a new store type KeyValueWithTimstampStore that extends the exiting KeyValueStore.

We generalize the translation from a changelog ConsumerRecord into an store KeyValue pair using a new interface RecordConverter – there will be a default implementation that does a 1:1 mapping from key to key and value to value. For the new KeyValueWithTimestampStore, we implement a mapping from key to key and from `value plus timestamp` to value.

We introduce interface RecordConverterStore that allows to translate from old storage format to new storage format.

We introduce interface StoreUpgradeBuilder that extends StoreBuilder and can return a "store proxy" that maps from the new store API to an internally used old store and that can return a RecordConverteStore to the actual store upgrade. Thus, StoreUpgradeBuilder can return proxy, upgrade, and new store.

Proposed Changes

To make use of the new timestamp that can be stored, we need to add new interfaces to the existing store interfaces that allow to read/write the timestamp and the value at once.

package org.apache.kafka.streams.state;

// new interfaces

public interface ValueAndTimestamp<V> {
    V value();
    long timestamp();
}


public interface ReadOnlyKeyValueWithTimestampStore<K, V> extends ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> {}

public interface KeyValueWithTimestampStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {
    void put(K key, V value, long timestamp);
    ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp);
}

// extend existing classes (omitting existing method)

public final class Stores {
    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueWithTimestampStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                          final Serde<K> keySerde,
                                                                                                          final Serde<V> valueSerde);

    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                                           final Serde<K> keySerde,
                                                                                                                           final Serde<V> valueSerde);
}

public final class QueryableStoreTypes {
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueWithTimestampStore<K, V>> keyValueWithTimestampStore();
}

We extend our existing stores (RocksDB, InMemory) to implement the corresponding new interfaces. Note, we will keep the old stores and extend them to give PAPI users to choice to use stored with or without the ability to store timestamps.

The usage/implementation of upgrade stores is describe in the next section.

Upgrading

For a clean upgrade path for RocksDB itself, we need to introduce the above configs and new interfaces to implement actual upgrade code. We will provide implementations to upgrade from an existing KeyValueStore to and KeyValueWithTimestampStore. The defined interface are generic though, allowing to implement "upgrade stores" from any store type A to and other store type B.

To make the upgrade possible, we generalize the changelog-to-store mapping from ConsumerRecords to KeyValue pairs – this allows to customize the translation during store upgrade. To upgrade a store, a user updates her topology to use a `StoreUpgradeBuilder` that can return a proxy store, an upgrade store, and a regular store. If the upgrade mode is enabled, the runtime will create a proxy story for StreamTasks that implements the new store API but internally maps back to the old store (ie old on disk format). Using the proxy store allows to run new code with the already existing old store to avoid any downtime. At the same time, runtime will create StoreUpgradeTasks that use `RecordConverterStore`. For those StoreUpgradeTasks, the changelog topic will be consumer, record format will be update to the new format using `RecordConverter` interface; afterwards, the new format is written to (new) RocksDB (and back to the changelog topic if required). Because the changelog topic may contain data with different format, we encode the format in the record header and enhance all changelog readers to check the header for the correct version. If upgrade mode is disabled, `StoreUpgradeBuilder` is used as a regular store builder returning the new store – the difference to use the a builder for the new store directly is two fold: (1) using StoreUpgradeBuilder we can perform a final check if the upgrade finished. If not, we can first create no active tasks, but only StoreUpgradeTasks using an upgrade store. Only after the upgrade is finished, StoreUpgradeTasks are destroyed and regular task using the new stores are created. (2) After a successful update, users don't need to rewrite their code (of course, they still can rewrite the code after a successful upgrade and replace `StoreUpgradeBuilder` with a store builder for the new store only).

As mentioned, we generalize the changelog-to-store mapping from CosumerRecords to KeyValue pairs. If a store (like KeyValueWithTimestampStore) requires a non-default mapping, the corresponding `StateRestoreCallback` must implement `RecordConverter`, too.

package org.apache.kafka.streams.processor;

/**
 * {@code RecordConverter} translates a {@link ConsumerRecord} into a {@link KeyValue} pair.
 */
public interface RecordConverter {

    /**
     * Convert a given record into a key-value pair.
     *
     * @param record the consumer record
     * @return the record as key-value pair
     */
    KeyValue<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record);

}



/****************************************************************************************************/
package org.apache.kafka.streams.processor;

/**
 * {@code RecordConverterStore} translates a changelog record from old store format to new store format.
 */
public interface RecordConverterStore extends StateStore, RecordConverter {
}



/****************************************************************************************************/
package org.apache.kafka.streams.processor;

/**
 * {@code StoreUpgradeBuilder} that provides a store as well as corresponding proxy store and converter store.
 * <p>
 * The proxy store maps from a new store API to an old store API.
 * It is responsible to map from the new API calls to the old API during store upgrade phase.
 * <p>
 * The converter store is a store for the new format.
 * Additionally, it implements a {@link RecordConverter} that is used to translate from the old storage format to the new storage format.
 *
 * @param <S> store type (proxy store must implement this store type, too)
 * @param <C> new store type that additionally implements {@link RecordConverter}
 */
public interface StoreUpgradeBuilder<S extends StateStore, C extends RecordConverterStore> extends StoreBuilder<S> {

    /**
     * Return a new instance of the proxy store.
     * @return a new instance of the proxy store
     */
    S storeProxy();

    /**
     * Return a new instance of the converter store
     * @return a new instance of converter store
     */
    C converterStore();
}


For the actual RocksDB storage format for KeyValueWithTimestampStore, we add the record timestamp as a 8-byte (long) prefix to the value; ie, we change the format from <key:value> to <key:timestamp+value>. We need to introduce a new value serde that wraps the original value serde as well as a long serde. One important details is, that the serde only changes for the store, but not the changelog topic: the underlying changelog topic stores the timestamp in the record metadata timestamp field already. We need to intercept the write to the changelog topic accordingly.

For the actual upgrade, that might be "in place" we need to make sure to use different directories. Thus, StoreUpgradTasks create store directories with suffix _prepare, ie, for each active task with task directory `X_Y` a StoreUpgradeTask will use task directory `X_Y_prepare`. The directory isolation at task level ensures that we can reuse the same internal store directory structure for active and store-upgrade tasks. After the stores are restored, in a second rebalance, the old task directory will be renamed, the "prepare" directory will be renamed to act as new active task directory, and finally we delete the renamed original task directory to free up the disk space.

Compatibility, Deprecation, and Migration Plan

The storage format change requires application to upgrade correctly. We plan to offer two upgrade pathes.

  • In-place upgrade: this requires two rolling bounces of each application instance
    • advantage: simpler than roll-over upgrade
    • disadvantage: need 2x local disk storage during upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version; Kafka Streams need to be configured with upgrade.mode="in_place" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
      3. because upgrade mode is "in place", each instance will create a "store upgrade" task to each assigned active and standby task, that start to rebuild RocksDB stores in new format in parallel to existing RocksDB
        1. existing RocksDB will be used for processing so there is no down time
        2. each instance builds up the new RocksDB state for all its assigned active and standby tasks
        3. during restore, users need to observe restore progress (as actual processing resumes, restore will never the "finished", and it's up to the user when to trigger the second rolling bound)
      4. after all stores are prepared, user prepares a second round of rebalance; this time, the configuration parameter upgrade.mode must be removed for new startup
      5. do a second rolling bounce for each instance to get new config
        1. if prepare task directories are detected, we check if upgrade is finished and if not finish the upgrade (ie, read the latest delta from the changelog), the corresponding active directories are replace by the prepare task directories
        2. afterwards regular processing begins
  • Roll-over upgrade:
    • in-place upgrade might not be feasible because of its large local disk storage requirement; thus, roll-over is an alternative to enable upgrading if not enough local disk space is availalbe for a in-place upgrad
    • if an application is running with N instances, user starts N new instances in parallel (the all form one consumer group); the new intances rebuild the RocksDB state with the new format; finally, the old instances are stopped
    • upgrade flow:
      1. prepare a jar hot swap from old version for all existing (old) instances; Kafka Streams need to be configured with upgrade.mode="roll_over" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
        1. all old instances will just resume processing as usual
        2. because upgrade mode if "roll over" no store upgrade tasks are started (cf. "In-place" upgrade above)
      3. users starts N new instances with config parameter upgrade.mode="roll_over_new"
        1. The `roll_over_new` config will be encoded in the subscription metadata
        2. the leader can distinguish between old and new instances based on the used Subscription encoded information and assign tasks (active and restore) for restore to the "roll over" instances (ie, the assignment is "mirrored" to all "roll over instances)
        3. "roll over" instances only create StoreUpgradeTasks and perform the restore
      4. all old instances are stopped
        1. as long as at least one old instance is alive, the leader assign all aktive tasks to the old instances only
        2. however, as those are shut down in a row, idealy a rebalance will only trigger after all are down already (note, Kafka Streams does not send a "leave group request" and thus a rebalance only happens after session.timeout passes)
        3. when all old instances are down, the leader only receives version 3 Subscriptions and computes are version 3 Assigment base on the prepared stores and upgrade is finished
    • upgrading from 0.10.0.x to 1.2 uses the same upgrage pattern with config upgrade.from="0.10.0.x" and upgrade.mode="roll_over" must be used
      • instead of old Subscription and Assigmentmet metadata verions 2, metadata version 1 is used

Test Plan

  • unit and integration tests for the new embedded timestamp feature
    • tests that insure the timestamps are written and read correctly and that records are (de)serialized correctly
    • this includes reading source KTables, aggregation operations that result in KTables, and all other operators that might force a KTable materialization
    • KTable recovery/restore must be tested.
  • unit and integration tests for StreamPartitionsAssigner to react correctly to configs and received subscription
  • system tests that perform rolling bounce upgrades as described above
    • this should include failure scenario during the upgrade
    • this should include "simulated upgrades" to metadata version 4, to ensure that the implementation work correct for future changes

Rejected Alternatives

  • change the RocksDB on-disk format and encode the used serialization version per record (this would simplify future upgrades). However there are main disadvantages:
    • storage amplification for local stores
    • record version could get stored in record headers in changelog topics -> changelog topic might never overwrite record with older format
    • code needs to check all versions all the time for future release: increases code complexity and runtime overhead
    • it's hard to change the key format
      • for value format, the version number can be a magic prefix byte
      • for key lookup, we would need to know the magic byte in advance for efficient point queries into RocksDB; if multiple versions exist in parallel, this is difficult (either do multiple queries with different versions bytes until entry is found or all versions are tried implying does not exist – or use range queries but those are very expensive)
  • encode the storage format in the directory name not at "store version number" but at "AK release number"
    • might be confusion to user if store format does not change ("I am running Kafka 1.4, but the store indicates it's running on 1.2").
  • use a simpler upgrade path without any configs or complex rolling bounce scenarios
    • requires application down-time for upgrading to new format
  • use consumer's built-in protocol upgrade mechanism (ie, register multiple "assignment strategies")
    • has the disadvantage that we need to implement two StreamsPartitionAssingor classes
    • increased network traffic during rebalance
    • encoding "supported version" in metadata subsumes this approach for future releases anyway
    • if we want to "disable" the old protocol, a second rebalance is required, too
    • cannot avoid a second rebalance that this required for state store upgrade
  • only support in-place upgrade path instead of two to simplify the process for users (don't need to pick)
    • might be prohibitive if not enough disk space is available
  • allow DSL users to stay with old format: upgrade would be simpler as it's only one rolling bounce
    • unclear default behavior: should we stay on 1.1 format by default or should we use 1.2 format by default?
      • if 1.1 is default, upgrade is simple, but if one write a new application, users must turn on 1.2 format explicitly
      • if 1.2 is default, simple upgrade requires a config that tells Streams to stay with 1.1 format
      • conclusion: upgrading and not upgrading is not straight forward either way, thus, just force upgrade
    • if no upgrade happens, new features (as listed above) would be useless
  • Only prepare stores for active task (but not standby tasks)
    • this would reduce the disk footprint during upgrade
    • disadvantage: when switch to new version happens, there are not hot standby available for some time
    • we could make it configurable, however, we try to keep the number of configs small; also, it's already complex enough and adding more options would make it worse
    • it's not an issue for roll-over upgrade and not everybody configures Standbys in the frist place
      • people with standbys are willing to provide more disk, so it seem a fair assumption that they are fine with roll-over upgrade, too

 

  • No labels