Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA: TBD

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-14491

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • a new interface for versioned stores, and a helper class: VersionedKeyValueStore<K, V> extends StateStore , with helper VersionedRecord
  • a new interface for versioned store suppliers, and a helper interface: VersionedBytesStoreSupplier extends KeyValueBytesStoreSupplier , with helper VersionedBytesStore 
  • three new methods in Stores.java:
    • two for creating a persistent, versioned store supplier: Stores#persistentVersionedKeyValueStore(...) plus an overload
    • another for creating a StoreBuilder from a versioned supplier: Stores#versionedKeyValueStoreBuilder(...) 
  • a new method in TopologyTestDriver.java for getting a versioned store from a topology
  • a new static method in ValueAndTimestamp.java for creating ValueAndTimestamp instances where the value may be null: ValueAndTimestamp#makeAllowNullable(...) 

...

Code Block
package org.apache.kafka.streams.state;  

/**
 * A key-value store that stores multiple record versions per key, and supports timestamp-based
 * retrieval operations to return the latest record (per key) as of a specified timestamp.
 * Only one record is stored per key and timestamp, i.e., a second call to
 * {@link #put(Object, Object, long)} with the same key and timestamp will replace the first.
 * <p>
 * Each store instance has an associated, fixed-duration "history retention" which specifies
 * how long old record versions should be kept for. In particular, a versioned store guarantees
 * to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp
 * bound is within history retention of the current observed stream time. (Queries with timestamp
 * bound older than the specified history retention are considered invalid.)
 * <p>
 * @param <K> The key type
 * @param <V> The value type
 */ 
public interface VersionedKeyValueStore<K, V> extends StateStore {

    /**
     * Add a new record version associated with this key.
     *
     * @param key       The key
     * @param value     The value, it can be {@code null};
     *                  if the serialized bytes are also {@code null} it is interpreted as a deletestore's "history retention" also doubles as its "grace period," which determines how far
 * back in time writes to the store will be accepted. A versioned store will not accept writes
 * (inserts, updates, or deletions) if the timestamp associated with the write is older than the
 * current observed stream time by more than the grace period.
 *
 * @param <K> The key type
 * @param <V> The value type
 */ 
public interface VersionedKeyValueStore<K, V> extends StateStore {

    /**
     * Add a new record version associated with this key.
     * <p>
     * If @paramthe timestamp Theassociated timestampwith forthe thisnew record version
 is older than the *store's
 @throws NullPointerException If {@code null}* isgrace used for key.
     */
    void put(K key, V value, long timestamp); 

     /**
     * Delete the value associated with this key from the store, at the specified timestampperiod (i.e., history retention) relative to the current observed stream time,
     * then the record will not be added.
     *
     * @param key       The key
     * (if there is such a value), and return the deleted value.@param value     The value, it can be {@code null};
     *  <p>
     * This operation is semantically equivalent to {@link #get(Object, long)} #get(key, timestamp))}
     * followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}.
     * if the serialized bytes are also {@code null} it is interpreted as a delete
     * @param keytimestamp The timestamp for this record version
 The key
   * @throws * @param timestamp The timestampNullPointerException If {@code null} is used for this deletekey.
     */
 @return The value and timestamp of the latest record associated with this keyvoid put(K key, V value, long timestamp);

    /**
     * Delete the value associated with this key from as ofthe store, at the deletionspecified timestamp
  (inclusive), or {@code null}* (if anythere of
is such a value), and *return the deleted value.
     * <p>
 (1) the store contains no* recordsIf forthe thistimestamp key, (2) the latest record
     * associated with this deletion is older than the store's grace period
     * (i.e., history forretention) thisrelative keyto asthe ofcurrent theobserved deletionstream timestamptime, isthen a tombstone, orthe deletion
     * will not be performed and {@code null} will (3) the deletion timestamp is older than this store's history retentionbe returned.
     * <p>
     * This operation is semantically equivalent to {@link  (i.e., this store no longer contains data for the provided timestamp).#get(Object, long) #get(key, timestamp)}
     * followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}, with
     * @throws NullPointerException Ifa caveat that the return value is always {@code null} isif usedthe fordeletion key.timestamp
     */
  is  older VersionedRecord<V>than delete(K key, long timestamp);

    /**the store's grace period (i.e., history retention), regardless of
     * what Get the latest (by{@link #get(Object, long) #get(key, timestamp)} would return.
 record associated with this key.*
     * @param key       The key
     * @param keytimestamp The timestamp keyfor tothis fetchdelete
     * @return The value and timestamp of the latest record associated with this key, as orof
     *         the deletion timestamp (inclusive), or {@code null} if eitherno (1)such therecord storeexists
 contains no records for this* key or (2) the
     *(including if the deletion timestamp is older than this latest record for this key is a tombstone.
store's history
     *      * @throws NullPointerException retention time, i.e., the store no Iflonger nullcontains isdata usedfor forthe key.provided
     * @throws InvalidStateStoreException if the store is not initialized
 timestamp). Note that the */
record timestamp   VersionedRecord<V> get(K key);

    /**{@code r.timestamp()} of the
     * Get the latest record associated with this key with timestamp notreturned exceeding{@link theVersionedRecord} specified
may be smaller than the * timestamp bound.provided deletion
     *
     * @param key  timestamp.
     VersionedRecord<V> delete(K key, long timestamp);

  The key to fetch /**
     * Get @paramthe asOfTimestamplatest The(by timestamp) bound.record Thisassociated boundwith is inclusive; if a record this key.
     *
     * @param key The key to fetch
     * @return The value and timestamp (forof the specifiedlatest key)record existsassociated with this timestampkey, then or
     *         {@code null} if either (1) the store contains no records for this key thisor is(2) the record that will be returned.
     *  @return The value and timestamp of the latest record associated withfor this key is a tombstone.
     * @throws NullPointerException       satisfyingIf thenull providedis timestampused bound, or {@code null} if any offor key.
     * @throws InvalidStateStoreException if the store is not initialized
 (1) the store contains no*/
 records for this key,VersionedRecord<V> get(2) the latest recordK key);

    /**
     * Get the latest record associated with this key forwith thistimestamp keynot satisfyingexceeding the specified
 provided timestamp bound is a* tombstone,timestamp orbound.
     *
     * @param key   (3) the provided timestamp bound is older than thisThe store'skey historyto retentionfetch
     * @param asOfTimestamp The timestamp bound. This bound  (i.e., this store no longer contains data for the provided timestamp bound).
     * @throws NullPointerExceptionis inclusive; if a record 
     *              If null is used for key.
   (for the *specified @throwskey) InvalidStateStoreExceptionexists ifwith thethis store is not initializedtimestamp, then 
     */
     VersionedRecord<V> get(K key, long asOfTimestamp);
}

Note that this proposal intentionally omits most methods from the existing KeyValueStore interface in order to keep the new interface simple. It could be nice to add additional methods in the future, such as rangeKey() methods to enable the foreign-key join subscription store use case, but this is deferred to a future KIP in order to align on these basic interfaces first.

The VersionedRecord return type from the get() methods is essentially the same as the existing ValueAndTimestamp class today, but is its own separate class so that we can evolve it in the future. For example, we may want to add an additional timestamp to the VersionedRecord class to represent the expiry time of the record version (i.e., the timestamp of the next record version for this key) in addition to the existing timestamp.

Code Block
package org.apache.kafka.streams.state;

/**
 * Combines a value from a {@link KeyValue} with a timestamp, for use as the return type
 * from {@link VersionedKeyValueStore#get(Object, long)} and related methods.
 *
 * @param <V> The value type
 */
public final class VersionedRecord<V> {
    private final V value;
    private final long timestamp;

    private VersionedRecord(final V value, final long timestamp) {
        this.value = Objects.requireNonNull(value);
        this.timestamp = timestamp;
    }

    /**
     * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
     *
     * @param value      the value                 this is the record that will be returned.
     * @return The value and timestamp of the record associated with this key
     *         as of the provided timestamp, or {@code null} if no such record exists
     *         (including if the provided timestamp bound is older than this store's history
     *         retention time, i.e., the store no longer contains data for the provided
     *         timestamp). Note that the record timestamp {@code r.timestamp()} of the
     *         returned {@link VersionedRecord} may be smaller than the provided timestamp
     *         bound.
     * @throws NullPointerException       If null is used for key.
     * @param@throws timestampInvalidStateStoreException if the timestamp store is not initialized
     * @param <V> the type of the value
     * @return a new {@link VersionedRecord} instance
     */
    public static <V> VersionedRecord<V> makeget(finalK V valuekey, final long timestampasOfTimestamp);
}

Note that this proposal intentionally omits most methods from the existing KeyValueStore interface in order to keep the new interface simple. It could be nice to add additional methods in the future, such as rangeKey() methods to enable the foreign-key join subscription store use case, but this is deferred to a future KIP in order to align on these basic interfaces first.

The VersionedRecord return type from the get() methods is essentially the same as the existing ValueAndTimestamp class today, but is its own separate class so that we can evolve it in the future. For example, we may want to add an additional timestamp to the VersionedRecord class to represent the expiry time of the record version (i.e., the timestamp of the next record version for this key) in addition to the existing timestamp.

Code Block
package org.apache.kafka.streams.state;

/**
 * Combines a value from a {@link KeyValue} with a timestamp, for use as the return type
 * from {@link VersionedKeyValueStore#get(Object, long)} and related methods.
 *
 * @param <V> The value type
 */
public final class VersionedRecord<V> {
        if (value == null) {
            throw new IllegalArgumentException("value cannot be null");
        }
        return new VersionedRecord<>(value, timestamp);
    }

    public V value() {
        return value;
    }

    public long timestamp() {
        return timestamp;
    }

    @Override
    public String toString() {
        return "<" + value + "," + timestamp + ">";
    }

    @Override
    public boolean equals(final Object o) {
    private final V value;
 if (this == o)private {
final long timestamp;

    /**
     * Create a return true;
        }new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
     *
   if (o ==* null@param ||value getClass() != o.getClass()) {
  the value
     * @param timestamp  returnthe false;timestamp
     * @return a new {@link VersionedRecord} instance
     */
    public VersionedRecord(final VersionedRecord<?> that = (VersionedRecord<?>) o; V value, final long timestamp) {
        return this.value = Objects.requireNonNull(value);
        this.timestamp == that.timestamp &&;
    }

    public V value() {
        Objects.equals(value, that.value)return value;
    }

    @Override
    public intlong hashCodetimestamp() {
        return Objects.hash(value, timestamp);
    }
}

Store Supplier/Builder Interfaces

The new Stores.java methods are as follows:

Code Block
public final class Stores {


    @Override
    // ... existing methods ...
public String toString() {
    /**
    return *"<" Create+ avalue persistent+ versioned key-value store {@link VersionedBytesStoreSupplier}."," + timestamp + ">";
    }

 * <p>
  @Override
   * Thispublic storeboolean supplierequals(final canObject beo) passed{
 into a
     * {@linkif #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}.this == o) {
     *
     * @param namereturn true;
        }
    name  of the storeif (cannot be {@codeo == null})
 || getClass() !=  * @param historyRetention length of time that old record versions are available for queryo.getClass()) {
            return false;
     *   }
        final VersionedRecord<?> that = (VersionedRecord<?>) o;
        return (cannottimestamp be== negative). If a timestamp bound provided to
that.timestamp &&
           * Objects.equals(value, that.value);
    }

    @Override
    public int hashCode() {
        {@link VersionedKeyValueStore#get(Objectreturn Objects.hash(value, longtimestamp)};
 is older than }
}

Store Supplier/Builder Interfaces

The new Stores.java methods are as follows:

Code Block
public final class Stores {

    // ... existing methods ...

this
     *           /**
     * Create a persistent versioned key-value store   specified history retention, then the get operation will not return data{@link VersionedBytesStoreSupplier}.
     * @return an instance of {@link VersionedBytesStoreSupplier}<p>
     * This @throwsstore IllegalArgumentExceptionsupplier ifcan {@codebe historyRetention}passed orinto {@codea
 segmentInterval} can't be represented as* {code long milliseconds}@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}.
     */
    public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(final String* @param name,
             name of the store (cannot be {@code null})
     * @param historyRetention length of time that old record versions are available for query
     *                         (cannot be negative). If a timestamp bound provided to
   final Duration historyRetention)* {
        // ...
    }

    /**
     * Create a persistent versioned key-value store {@link VersionedBytesStoreSupplier}.{@link VersionedKeyValueStore#get(Object, long)} is older than this
     * <p>
     * This store supplier can be passed into a
      * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}.
 specified history retention, then *
the get operation will not *return @paramdata.
 name    * @return an instance of {@link VersionedBytesStoreSupplier}
   name of the* store@throws (cannotIllegalArgumentException beif {@code nullhistoryRetention})
 or {@code segmentInterval} can't *be @paramrepresented historyRetentionas length{code oflong timemilliseconds}
 that old record versions are*/
 available for query
 public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(final *String name,
                        (cannot be negative). If a timestamp bound provided to
     *                         {@link VersionedKeyValueStore#get(Object, long)} is older than this
     *      final Duration historyRetention) {
        // ...
    }

    specified history retention, then the get operation will not return data/**
     * Create a persistent versioned key-value store {@link VersionedBytesStoreSupplier}.
     * @param<p>
 segmentInterval  size of segments* forThis storingstore oldsupplier record versions (must can be positive).passed Oldinto record versionsa
     * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}.
     *
     * @param name         for the same key inname aof singlethe segmentstore are(cannot storedbe (updated and accessed) together.{@code null})
     * @param historyRetention length of time that old record versions are available for query
     *       The only impact of this parameter is performance. If segments are large
     *   (cannot be negative). If a timestamp bound provided to
     *           and a workload results in many record versions for the same key being collected
 {@link VersionedKeyValueStore#get(Object, long)} is *older than this
     *                  in a single segment, performance may degrade asspecified ahistory result.retention, Onthen the other hand, get operation will not return data.
     * @param segmentInterval  size of segments for storing old record versions (must be positive). Old record versions
     *    reads and out-of-order writes which access older segments may slow down if 
     *    for the same key in a single segment are stored (updated and accessed) together.
     *   there are too many segments.
     * @return an instance of {@link VersionedBytesStoreSupplier}
     * @throws IllegalArgumentException if {@codeThe historyRetention}only orimpact {@codeof segmentInterval} can't be represented as {code long milliseconds}this parameter is performance. If segments are large
     */
    public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(final String name,
                and a workload results in many record versions for the same key being collected
     *                         in a single segment, performance may degrade as a result. On the other hand, 
     * final Duration historyRetention,
                      reads and out-of-order writes which access older segments may slow down if 
     *                         there are too many segments.
     * @return an instance of {@link VersionedBytesStoreSupplier}
     * final@throws DurationIllegalArgumentException segmentInterval)if {
		// ...
	}

 	/**
     * Creates a {@link StoreBuilder} that can be used to build a {@link VersionedKeyValueStore}.
     * @code historyRetention} or {@code segmentInterval} can't be represented as {code long milliseconds}
     */
    public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(final String name,
     * @param supplier   a {@link VersionedBytesStoreSupplier} (cannot be {@code null})
     * @param keySerde   the key serde to use
     * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
     *                   it is treated asfinal aDuration deletionhistoryRetention,
     * @param <K>        key type
     * @param <V>        value type
     * @return an instance of a {@link StoreBuilder} that can build a {@link VersionedKeyValueStore}
     */
    public static <K, V> StoreBuilder<VersionedKeyValueStore<K, V>> versionedKeyValueStoreBuilder(final VersionedBytesStoreSupplier supplier,
            final Duration segmentInterval) {
		// ...
	}

 	/**
     * Creates a {@link StoreBuilder} that can be used to build a {@link VersionedKeyValueStore}.
     * 
     * @param supplier   a {@link VersionedBytesStoreSupplier} (cannot be {@code null})
     * @param keySerde   the key serde to use
     * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
     final* Serde<K> keySerde,
                 it is treated as a deletion
     * @param <K>        key type
     * @param <V>        value type
     * @return an instance of a {@link StoreBuilder} that can build a {@link VersionedKeyValueStore}
     */
    public static <K, V> StoreBuilder<VersionedKeyValueStore<K, V>> versionedKeyValueStoreBuilder(final VersionedBytesStoreSupplier supplier,
                            final Serde<V> valueSerde) {
                     // ...
    }
}

To understand the history retention and segment interval parameters for the persistentVersionedKeyValueStore() methods requires brief discussion of the planned RocksDB implementation for versioned stores.

RocksDB Implementation Overview

Here's a high-level overview of the RocksDB versioned store implementation (details are outside the scope of this KIP).

Each store has an associated, fixed-duration history retention which specifies how long old record versions should be kept for. In particular, a versioned store guarantees to return accurate results for calls to get(key, asOfTimestamp) where the provided timestamp bound is within history retention of the current observed stream time. (If the timestamp bound is outside the specified history retention, a warning is logged and null is returned.)

To achieve this, the store will consist of a "latest value store" and "segment stores." The latest record version for each key will be stored in the latest value store, while all older versions will be stored in the segment stores. 

Each record version has two associated timestamps:

  • a validFrom timestamp. This timestamp is explicitly associated with the record as part of the put() call to the store; i.e., this is the record's timestamp.
  • a validTo timestamp. This is the timestamp of the next record (or deletion) associated with the same key, and is implicitly associated with the record. This timestamp can change as new records are inserted into the store.

The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and can change as new record versions are inserted into the store (and validTo changes as a result).

Old record versions are stored in segment stores according to their validTo timestamps. The use of segments here is analogous to that in the existing RocksDB implementation for windowed stores. Because records are stored in segments based on their validTo timestamps, this means that entire segments can be expired at a time once the records contained in the segment are no longer relevant based on the store's history retention. (A difference between the versioned store segments implementation and that of windowed stores today is that for versioned stores all segments will share the same physical RocksDB instance, in contrast to windowed stores where each segment is its own RocksDB, to allow for many more segments than windowed stores use today.)

The segment interval parameter for controlling segment size is (optionally) exposed to users in the static constructor methods above because benchmarking a prototype implementation showed that this parameter has significant effect on store performance based on workload characteristics.

VersionedBytesStoreSupplier Interface

                                                final Serde<K> keySerde,
                                                                                                  final Serde<V> valueSerde) {
        // ...
    }
}

To understand the history retention and segment interval parameters for the persistentVersionedKeyValueStore() methods requires brief discussion of the planned RocksDB implementation for versioned stores.

RocksDB Implementation Overview

Here's a high-level overview of the RocksDB versioned store implementation (details are outside the scope of this KIP).

Each store has an associated, fixed-duration history retention which specifies how long old record versions should be kept for. In particular, a versioned store guarantees to return accurate results for calls to get(key, asOfTimestamp) where the provided timestamp bound is within history retention of the current observed stream time. (If the timestamp bound is outside the specified history retention, then a record is still returned if the latest record version for the key satisfies the timestamp bound. Otherwise, a warning is logged and null is returned.)

To achieve this, the store will consist of a "latest value store" and "segment stores." The latest record version for each key will be stored in the latest value store, while all older versions will be stored in the segment stores. 

Each record version has two associated timestamps:

  • a validFrom timestamp. This timestamp is explicitly associated with the record as part of the put() call to the store; i.e., this is the record's timestamp.
  • a validTo timestamp. This is the timestamp of the next record (or deletion) associated with the same key, and is implicitly associated with the record. This timestamp can change as new records are inserted into the store.

The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and can change as new record versions are inserted into the store (and validTo changes as a result).

Old record versions are stored in segment stores according to their validTo timestamps. The use of segments here is analogous to that in the existing RocksDB implementation for windowed stores. Because records are stored in segments based on their validTo timestamps, this means that entire segments can be expired at a time once the records contained in the segment are no longer relevant based on the store's history retention. (A difference between the versioned store segments implementation and that of windowed stores today is that for versioned stores all segments will share the same physical RocksDB instance, in contrast to windowed stores where each segment is its own RocksDB, to allow for many more segments than windowed stores use today.)

The segment interval parameter for controlling segment size is (optionally) exposed to users in the static constructor methods above because benchmarking a prototype implementation showed that this parameter has significant effect on store performance based on workload characteristics.

VersionedBytesStoreSupplier Interface

Here's the VersionedBytesStoreSupplier interface used by the Stores.java methods above:

Code Block
package org.apache.kafka.streams.state;

/**
 * A store supplier that can be used to create one or more versioned key-value stores,
 * specifically, {@link VersionedBytesStore} instances.
 * <p>
 * Rather than representing the returned store as a {@link VersionedKeyValueStore} of
 * type <Bytes, byte[]>, this supplier interface represents the returned store as a
 * {@link KeyValueStore} of type <Bytes, byte[]> (via {@link VersionedBytesStore}) in order to be compatible with
 * existing DSL methods for passing key-value stores such as {@link StreamsBuilder#table(String, Materialized)}
 * and {@link KTable#filter(Predicate, Materialized)}. A {@code VersionedKeyValueStore<Bytes, byte[]>}
 * is represented as a {@code KeyValueStore KeyValueStore<Bytes, byte[]>} by interpreting the
 * value bytes as containing record timestamp information in addition to raw record values.
 */
public interface VersionedBytesStoreSupplier extends KeyValueBytesStoreSupplier {

    /**
     * Returns the history retention (in milliseconds) that stores created from this supplier will have.
     * This value is used to set compaction configs on store changelog topics (if relevant).
	 *
     * @return history retention, i.e., length of time that old record versions are available for
     *         query from a versioned store
     */
    long historyRetentionMs();
}

As mentioned in the Javadoc, the reason that this supplier extends KeyValueBytesStoreSupplier and therefore returns a store of type KeyValueStore<Bytes, byte[]>  rather than a VersionedKeyValueStore<Bytes, byte[]>  (as the name suggests) is in order to be compatible with existing DSL methods for passing key-value stores, e.g., StreamsBuilder#table() and KTable methods, which are explicitly typed to accept Materialized<K, V, KeyValueStore<Bytes, byte[]> . The alternative to fitting VersionedKeyValueStore  into KeyValueStore  in this way is to introduce new versions of all relevant StreamsBuilder and KTable methods to relax the Materialized  type accepted by these methods. While this is possible, and we could even deprecate the existing methods in favor of the new ones introduced, this is a large surface area for public interface changes that it's best to avoid if possible.

The cost of fitting VersionedKeyValueStore  into KeyValueStore  as proposed is two additional layers of translation, for both DSL and PAPI users, whenever put() or get() is called on a versioned store: the record being written or read must be converted between (keyBytes, valueBytes, timestamp) and (keyBytes, valueBytes + serializedTimestamp) and back. It also means that users who wish to create their own VersionedKeyValueStore implementation (specifically, PAPI users who want to use the provided Stores#versionedKeyValueStoreBuilder method, and DSL users) also need to mimic this translation layer from VersionedKeyValueStore to KeyValueStore and back. 

To alleviate this pain, we could expose an additional helper method for the conversion and/or add an additional method to VersionedBytesStoreSupplier which directly returns a VersionedKeyValueStore<Bytes, byte[]>  if implemented. The latter allows us to save on the two additional layers of translation, at the expense of complicating one of the interfaces. Unless reviewers feel strongly about this (avoiding the extra translation and/or making it easier for users to create their own VersionedKeyValueStore implementations), I propose to leave these options out for now and we can always revisit them later.

For completeness, here's the new VersionedBytesStore interface which VersionedBytesStoreSupplier instances will return. Unless a user chooses to implement their own VersionedBytesStoreSupplier (i.e., in order to implement a custom versioned store to pass to the DSL or to the new Stores#versionedKeyValueStoreBuilder() method), then users will not need to interact with this interface.Here's the VersionedBytesStoreSupplier interface used by the Stores.java methods above:

Code Block
package org.apache.kafka.streams.state;

/**
 * A store supplier that can be used to create one or more versioned key-value stores,
 * specifically, {@link VersionedBytesStore} instances.
 * <p>
 * Rather than representing the returned store as a {@link VersionedKeyValueStore} of

/**
 * typeA <Bytes, byte[]>, this supplier interface represents the returned representation of a versioned key-value store as a
 * {@link KeyValueStore} of type <Bytes, byte[]> (via {@link VersionedBytesStore}) in order to be compatible with.
 * existing DSL methods for passing key-value stores such as See {@link StreamsBuilder#table(String, Materialized)}
 * and {@link KTable#filter(Predicate, Materialized)}. A {@code VersionedKeyValueStore<BytesVersionedBytesStoreSupplier} for more.
 */
public interface VersionedBytesStore extends KeyValueStore<Bytes, byte[]>}
 * is represented as a {@code KeyValueStore KeyValueStore<Bytes, byte[]>} by interpreting the
 * value bytes as containing record timestamp information in addition to raw record values.
 */
public interface VersionedBytesStoreSupplier extends KeyValueBytesStoreSupplier {

    /**, TimestampedBytesStore {

    /**
     * The analog of {@link VersionedKeyValueStore#get(Object, long)}.
     */
    byte[] get(Bytes key, long asOfTimestamp);

	/**
     * The analog of {@link VersionedKeyValueStore#delete(Object, long)}.
     */
 Returns the history retentionbyte[] delete(in milliseconds) that stores created from this supplier will have.
     * This value is used to set compaction configs on store changelog topics (if relevant).
	 *
     * @return history retention, i.e., length of time that old record versions are available forBytes key, long timestamp);
}

Internally, this interface will be used to assist in the representation of VersionedKeyValueStore<Bytes, byte[]>  as KeyValueStore<Bytes, byte[]> .

Additional Interface Changes

TopologyTestDriver

TopologyTestDriver users should be able to get (and interact with) versioned stores from their topology, similar to the existing methods for other store types:

Code Block
public class TopologyTestDriver implements Closeable {

	// ... existing methods ...

 	/**
     * Get the {@link VersionedKeyValueStore} with the given name.
     * The store can be a "regular" or global store.
  query from a versioned* store<p>
     */
 This is  long historyRetentionMs();
}

As mentioned in the Javadoc, the reason that this supplier extends KeyValueBytesStoreSupplier and therefore returns a store of type KeyValueStore<Bytes, byte[]>  rather than a VersionedKeyValueStore<Bytes, byte[]>  (as the name suggests) is in order to be compatible with existing DSL methods for passing key-value stores, e.g., StreamsBuilder#table() and KTable methods, which are explicitly typed to accept Materialized<K, V, KeyValueStore<Bytes, byte[]> . The alternative to fitting VersionedKeyValueStore  into KeyValueStore  in this way is to introduce new versions of all relevant StreamsBuilder and KTable methods to relax the Materialized  type accepted by these methods. While this is possible, and we could even deprecate the existing methods in favor of the new ones introduced, this is a large surface area for public interface changes that it's best to avoid if possible.

The cost of fitting VersionedKeyValueStore  into KeyValueStore  as proposed is two additional layers of translation, for both DSL and PAPI users, whenever put() or get() is called on a versioned store: the record being written or read must be converted between (keyBytes, valueBytes, timestamp) and (keyBytes, valueBytes + serializedTimestamp) and back. It also means that users who wish to create their own VersionedKeyValueStore implementation (specifically, PAPI users who want to use the provided Stores#versionedKeyValueStoreBuilder method, and DSL users) also need to mimic this translation layer from VersionedKeyValueStore to KeyValueStore and back. 

To alleviate this pain, we could expose an additional helper method for the conversion and/or add an additional method to VersionedBytesStoreSupplier which directly returns a VersionedKeyValueStore<Bytes, byte[]>  if implemented. The latter allows us to save on the two additional layers of translation, at the expense of complicating one of the interfaces. Unless reviewers feel strongly about this (avoiding the extra translation and/or making it easier for users to create their own VersionedKeyValueStore implementations), I propose to leave these options out for now and we can always revisit them later.

For completeness, here's the new VersionedBytesStore interface which VersionedBytesStoreSupplier instances will return. Unless a user chooses to implement their own VersionedBytesStoreSupplier (i.e., in order to implement a custom versioned store to pass to the DSL or to the new Stores#versionedKeyValueStoreBuilder() method), then users will not need to interact with this interface.

Code Block
package org.apache.kafka.streams.state;

/**
 * A representation of a versioned key-value store as a {@link KeyValueStore} of type <Bytes, byte[]>.
 * See {@link VersionedBytesStoreSupplier} for more.
 */
public interface VersionedBytesStore extends KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {

    /**
     * The analog of {@link VersionedKeyValueStore#get(Object, long)}.
     */
    byte[] get(Bytes key, long asOfTimestamp);
}

Internally, this interface will be used to assist in the representation of VersionedKeyValueStore<Bytes, byte[]>  as KeyValueStore<Bytes, byte[]> .

Additional Interface Changes

One additional interface change needed as part of this proposal is to add the following static constructor to ValueAndTimestamp :

Code Block
public final class ValueAndTimestamp<V> {

	// ... existing methods ...

    /**
     * Create a new {@link ValueAndTimestamp} instance. The provided {@code value} may be {@code null}.
     * 
     * @param value      the value
     * @param timestamp  the timestamp
     * @param <V> the type of the value
     * @return a new {@link ValueAndTimestamp} instance
     */
    public static <V> ValueAndTimestamp<V> makeAllowNullable(
        final V value, final long timestamp) {
        // ...
    }
}

The reason this addition is needed is an implementation detail. The existing DSL processor implementation represents all source table state stores as timestamped key-value stores (link) which means that, unless we want to lift this restriction and change a significant amount of code internally, then versioned key-value stores will have to fit the TimestampedKeyValueStore interface internally. TimestampedKeyValueStore  represents inserting a new record to the store as calling put(K key, ValueAndTimestamp<V> v). In order to allow inserting tombstones into versioned stores, ValueAndTimestamp therefore needs to allow null values.

Even though this is a public interface change (by virtue of ValueAndTimestamp being public), the usage of ValueAndTimestamp instances with null values will be purely internal. In other words, this change is not strictly needed as a public interface change and could also be achieved through refactoring. That said, the cost of introducing this new method seems low and I'd like to propose it.

Compatibility, Deprecation, and Migration Plan

This KIP introduces a new type of store without deprecating any existing interfaces. Unless a user explicitly updates their application code to use the new store, this KIP will have no effect on their applications (versioned stores are not used anywhere by default).

The RocksDB format used for versioned stores is not compatible with the existing format for non-versioned stores.

However, RocksDB-based versioned and non-versioned stores will use the same changelog topic format, though their changelog topic configurations will differ. Specifically, the changelog bytes format for RocksDB-based versioned and non-versioned stores is the same, but changelog topics for versioned stores need min.compaction.lag.ms set to a value suitable for the desired history retention of the versioned store. The RocksDB versioned store implementation will set min.compaction.lag.ms equal to history retention plus 24 hours, where the purpose of this additional buffer is to account for the broker's usage of wall clock time in topic compactions (analogous to the extra 24 hours changelog retention for windowed stores today). Changelog topic configs will be set only on changelog topic creation, and will not be verified if the changelog topic already exists.

In light of the above, users can use the following manual procedure to update an existing application with a non-versioned store to use a versioned store instead:

  1. Stop the application
  2. Delete all local state (for the store being updated) from all instances
  3. Update the changelog topic configurations to set min.compaction.lag.ms to a value suitable for the desired history retention (e.g., history retention plus some buffer to account for broker wall clock time usage in topic cleanup)
  4. Update the application code to use a versioned store
  5. Restart the app.

There are no plans to support a non-manual upgrade procedure or a live migration procedure at this time. In the future, it could be nice to make versioned stores the default since a non-versioned store is simply a special case of a versioned store (with history retention 0) but that's far out of scope for this KIP.

Test Plan

The RocksDB-based versioned store implementation will be tested with the Processor API: put, get, and timestamp-based get methods will have their results validated. 

The manual procedure described above for updating an application using a non-versioned store to use a versioned store will be tested as well. 

Rejected Alternatives

Versioned Store Interface

History retention and get(key, asOfTimestamp)

In the event that get(key, asOfTimestamp) is called with a timestamp bound older than the specified history retention, instead of returning null (and logging a warning) as proposed above, other design options include (1) throwing an exception or (2) updating the return type from VersionedRecord<V> to Optional<VersionedRecord<V>> and returning an empty optional to indicate that the timestamp bound was invalid. The first option is not very user-friendly. The second option complicates the interface and diverges the return types of get(key) and get(key, asOfTimestamp) .

ValueAndTimestamp as return type of get(key, asOfTimestamp) / Additional return timestamps from get(key, asOfTimestamp)

The proposed return type from get(key, asOfTimestamp) of VersionedRecord<V> returns the record value and timestamp (i.e., validFrom timestamp) found for the given key (and timestamp bound). In some situations, it may be useful for users to additionally have the validTo timestamp associated with the record. In order to allow for this possibility in the future, the return type of get(key, asOfTimestamp) is a new type, VersionedRecord , rather than the existing ValueAndTimestamp<V>  type, even though the two are largely the same today. We considered keeping the interface simple by not introducing a new type, but felt that the flexibility of evolving this type in the future was worth the addition of a new class. However, we will not add additional return timestamps at this time. They can be added once we have more confidence that they will be useful for users.

Return null with timestamp from get()

often useful in test cases to pre-populate the store before the test case instructs the topology to
     * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward.
     *
     * @param name the name of the store
     * @return the key value store, or {@code null} if no {@link VersionedKeyValueStore} has been registered with the given name
     * @see #getAllStateStores()
     * @see #getStateStore(String)
     * @see #getKeyValueStore(String)
     * @see #getTimestampedKeyValueStore(String)
     * @see #getWindowStore(String)
     * @see #getTimestampedWindowStore(String)
     * @see #getSessionStore(String)
     */
    public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(final String name) {
        // ...
    }
}

ValueAndTimestamp

Another additional interface change needed as part of this proposal is to add the following static constructor to ValueAndTimestamp :

Code Block
public final class ValueAndTimestamp<V> {

	// ... existing methods ...

    /**
     * Create a new {@link ValueAndTimestamp} instance. The provided {@code value} may be {@code null}.
     * 
     * @param value      the value
     * @param timestamp  the timestamp
     * @param <V> the type of the value
     * @return a new {@link ValueAndTimestamp} instance
     */
    public static <V> ValueAndTimestamp<V> makeAllowNullable(
        final V value, final long timestamp) {
        // ...
    }
}

The reason this addition is needed is an implementation detail. The existing DSL processor implementation represents all source table state stores as timestamped key-value stores (link) which means that, unless we want to lift this restriction and change a significant amount of code internally, then versioned key-value stores will have to fit the TimestampedKeyValueStore interface internally. TimestampedKeyValueStore  represents inserting a new record to the store as calling put(K key, ValueAndTimestamp<V> v). In order to allow inserting tombstones into versioned stores, ValueAndTimestamp therefore needs to allow null values.

Even though this is a public interface change (by virtue of ValueAndTimestamp being public), the usage of ValueAndTimestamp instances with null values will be purely internal. In other words, this change is not strictly needed as a public interface change and could also be achieved through refactoring. That said, the cost of introducing this new method seems low and I'd like to propose it.

Compatibility, Deprecation, and Migration Plan

This KIP introduces a new type of store without deprecating any existing interfaces. Unless a user explicitly updates their application code to use the new store, this KIP will have no effect on their applications (versioned stores are not used anywhere by default).

The RocksDB format used for versioned stores is not compatible with the existing format for non-versioned stores.

However, RocksDB-based versioned and non-versioned stores will use the same changelog topic format, though their changelog topic configurations will differ. Specifically, the changelog bytes format for RocksDB-based versioned and non-versioned stores is the same, but changelog topics for versioned stores need min.compaction.lag.ms set to a value suitable for the desired history retention of the versioned store. The RocksDB versioned store implementation will set min.compaction.lag.ms equal to history retention plus 24 hours, where the purpose of this additional buffer is to account for the broker's usage of wall clock time in topic compactions (analogous to the extra 24 hours changelog retention for windowed stores today). Changelog topic configs will be set only on changelog topic creation, and will not be verified if the changelog topic already exists.

In light of the above, users can use the following manual procedure to update an existing application with a non-versioned store to use a versioned store instead:

  1. Stop the application
  2. Delete all local state (for the store being updated) from all instances
  3. Update the changelog topic configurations to set min.compaction.lag.ms to a value suitable for the desired history retention (e.g., history retention plus some buffer to account for broker wall clock time usage in topic cleanup)
  4. Update the application code to use a versioned store
  5. Restart the app.

There are no plans to support a non-manual upgrade procedure or a live migration procedure at this time. In the future, it could be nice to make versioned stores the default since a non-versioned store is simply a special case of a versioned store (with history retention 0) but that's far out of scope for this KIP.

Test Plan

The RocksDB-based versioned store implementation will be tested with the Processor API: put, get, and timestamp-based get methods will have their results validated. 

The manual procedure described above for updating an application using a non-versioned store to use a versioned store will be tested as well. 

Rejected Alternatives

Versioned Store Interface

History retention and get(key, asOfTimestamp)

In the event that get(key, asOfTimestamp) is called with a timestamp bound older than the specified history retention, instead of returning null (and logging a warning) as proposed above, other design options include (1) throwing an exception or (2) updating the return type from VersionedRecord<V> to Optional<VersionedRecord<V>> and returning an empty optional to indicate that the timestamp bound was invalid. The first option is not very user-friendly. The second option complicates the interface and diverges the return types of get(key) and get(key, asOfTimestamp) .

Regarding the edge case where get(key, asOfTimestamp) is called with a timestamp bound older than the specified history retention but the latest record version for the key satisfies the timestamp bound, the proposal above says that the latest record version should be returned in this case, rather than rejecting the timestamped query and returning null. Returning the record is preferable because its existence (as the latest value for the key) is guaranteed in the store, and is accessible from get(key) anyway. The alternative of returning null, i.e., strict enforcement of the store's history retention, is not very user-friendly as users would then have to determine whether to call get(key) or get(key, timestamp) to account for this edge case.

ValueAndTimestamp as return type of get(key, asOfTimestamp) / Additional return timestamps from get(key, asOfTimestamp)

The proposed return type from get(key, asOfTimestamp) of VersionedRecord<V> returns the record value and timestamp (i.e., validFrom timestamp) found for the given key (and timestamp bound). In some situations, it may be useful for users to additionally have the validTo timestamp associated with the record. In order to allow for this possibility in the future, the return type of get(key, asOfTimestamp) is a new type, VersionedRecord , rather than the existing ValueAndTimestamp<V>  type, even though the two are largely the same today. We considered keeping the interface simple by not introducing a new type, but felt that the flexibility of evolving this type in the future was worth the addition of a new class. However, we will not add additional return timestamps at this time. They can be added once we have more confidence that they will be useful for users.

Return null with timestamp from get()

In the event that get(key) or get(key, asOfTimestamp) finds that the latest record version associated with a particular key (and possible timestamp bound) is a tombstone, rather than returning null the versioned store could instead return a non-null VersionedRecord with null value (and relevant timestamp). This would allow users to distinguish between the key not being found in the store at all (null VersionedRecord ) versus the key being found with a tombstone for the latest record (non-null VersionedRecord with null value). This proposal was rejected since the use cases for making such a distinction are limited.

Grace period separately configurable from history retention

"History retention" and "grace period" control how far back in time (relative to the current observed stream time) old reads and writes, respectively, will be accepted by the store. In the proposal above, users specify a single value which is used for both parameters, though in the future we could add an additional option for users to specify the two separately. (Today, users specify an explicit value for history retention, and grace period is automatically set to the same value. There are no compatibility concerns with introducing a new option for grace period in the future.)In the event that get(key) or get(key, asOfTimestamp) finds that the latest record version associated with a particular key (and possible timestamp bound) is a tombstone, rather than returning null the versioned store could instead return a non-null VersionedRecord with null value (and relevant timestamp). This would allow users to distinguish between the key not being found in the store at all (null VersionedRecord ) versus the key being found with a tombstone for the latest record (non-null VersionedRecord with null value). This proposal was rejected since the use cases for making such a distinction are limited.

Support for Upgrades

Additional support for upgrading a non-versioned store to a versioned store beyond the manual steps above were rejected on the basis of complexity. Automatic upgrades are too complex, and it's not clear that additional tooling for manual upgrades would be valuable to users at this time. It's better to get the new versioned interfaces out sooner in order to let them bake/iterate, rather than block on additional complexity for introducing the first version at this time.

...