Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: updates/clarifications in response to KIP discussion

...

  • a new interface for versioned stores: VersionedKeyValueStore<K, V> extends StateStore 
  • a new interface for versioned store suppliers, and a helper interface: VersionedBytesStoreSupplier extends KeyValueBytesStoreSupplier , with helper VersionedBytesStore 
  • three new methods in Stores.java:
    • two for creating a persistent, versioned store supplier: Stores#persistentVersionedKeyValueStore(...)  plus an overload
    • another for creating a StoreBuilder from a versioned supplier: Stores#versionedKeyValueStoreBuilder(...) 
  • a new static method in ValueAndTimestamp.java for creating ValueAndTimestamp instances where the value may be null: ValueAndTimestamp#makeAllowNullable(...) 

...

Code Block
package org.apache.kafka.streams.state;  

/**
 * A key-value store that stores multiple record versions per key, and supports timestamp-based
 * retrieval operations to return the latest record (per key) as of a specified timestamp.
 * Only one record is stored per key and timestamp, i.e., a second call to
 * {@link #put(Object, Object, long)} with the same key and timestamp will replace the first.
 * <p>
 * @paramEach <K>store Theinstance keyhas type
an * @param <V> The value type
 */
public interface VersionedKeyValueStore<K, V> extends StateStore {

    /**
     * Add a new record version associated with this key.
     *
     * @param key       The key
     * @param value     The value, it can be {@code null};
     *                  if the serialized bytes are also {@code null} it is interpreted as a deleteassociated, fixed-duration "history retention" which specifies
 * how long old record versions should be kept for. In particular, a versioned store guarantees
 * to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp
 * bound is within history retention of the current observed stream time. (Queries with timestamp
 * bound older than the specified history retention are considered invalid.)
 *
 * @param <K> The key type
 * @param <V> The value type
 */ 
public interface VersionedKeyValueStore<K, V> extends StateStore {

    /**
     * Add a new record version associated with this key.
     *
     * @param timestampkey   The timestamp for this recordThe versionkey
     * @throws NullPointerException If @param value     The value, it can be {@code null};
 is  used for key.
*     */
    void put(K key, V value, long timestamp);

    /**
     * Get the latest (by timestamp) record associated with this key.         if the serialized bytes are also {@code null} it is interpreted as a delete
     * @param timestamp The timestamp for this record version
     * @throws NullPointerException If {@code null} is used for key.
     */
    void put(K key, V value, long timestamp);

     /**
     * Delete the value associated with this key from the store, at the specified timestamp
     * (if there is such a value), and return the deleted value.
     * <p>
     * This operation is semantically equivalent to {@link #put(Object, Object, long) #put(key, null, timestamp)}
     * followed by {@link #get(Object, long)} #get(key, timestamp))}.
     * 
     * @param key       The key
     * @param timestamp The timestamp for this delete
     * @return 
     * @throws NullPointerException If {@code null} is used for key.
     */
    ValueAndTimestamp<V> delete(K key, long timestamp);

    /**
     * Get the latest (by timestamp) record associated with this key.
     *
     * @param key The key to fetch
     * @return The value and timestamp of the latest record associated with this key, or
     *         {@code null} if either (1) the store contains no records for this key or (2) the
     *
         *latest @paramrecord keyfor Thethis key is toa fetchtombstone.
     * @throws @returnNullPointerException The value and timestamp of the latestIf recordnull associatedis withused thisfor key, or.
     * @throws InvalidStateStoreException if the store is not initialized
 {@code null} if either (1) the store contains no records for this key or (2) the */
    ValueAndTimestamp<V> get(K key);

     /**
     * Get the  latest record associated forwith this key is a tombstone. with timestamp not exceeding the specified
     * @throwstimestamp NullPointerExceptionbound.
     *
  If null is used* for@param key.
      * @throws InvalidStateStoreException if the storeThe iskey notto initializedfetch
     */
 @param asOfTimestamp  ValueAndTimestamp<V> get(K key);

The timestamp bound. This bound is inclusive; if a record 
     /**
*          * Get the latest record associated with this key with timestamp not exceeding(for the specified
 key) exists with this *timestamp, timestampthen bound.
     *
     * @param key         The key to fetch
   this is *the @paramrecord timestampTothat Thewill timestampbe boundreturned.
     * @return The value and timestamp of the latest record associated with this key
     *         satisfying the provided timestamp bound, or {@code null} if any of
     *         (1) the store contains no records for this key, (2) the latest record
     *         for this key satisfying the provided timestamp bound is a tombstone, or
     *         (3) the storeprovided notimestamp longerbound containsis dataolder forthan thethis providedstore's timestamphistory boundretention
     *         (up to(i.e., this store implementationno discretionlonger whencontains thisdata isfor the caseprovided timestamp bound).
       * @throws NullPointerException       If null is used for key.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    ValueAndTimestamp<V> get(K key, long timestampToasOfTimestamp);
}

Note that this proposal intentionally omits most methods from the existing KeyValueStore interface in order to keep the new interface simple. It could be nice to add additional methods in the future, such as rangeKey() methods to enable the foreign-key join subscription store use case, but this is deferred to a future KIP in order to align on these basic interfaces first.

...

Each store has an associated, fixed-duration history retention which specifies how long old record versions should be kept for. In particular, a versioned store guarantees to return accurate results for calls to get(key, timestampToasOfTimestamp) where the provided timestamp bound is within history retention of the current observed stream time. (If the timestamp bound is outside the specified history retention, a warning is logged and null is returned.)

...

  • a validFrom timestamp. This timestamp is explicitly associated with the record as part of the put() call to the store; i.e., this is the record's timestamp.
  • a validTo timestamp. This is the timestamp of the next record (or deletion) associated with the same key, and is implicitly associated with the record. This timestamp can change as new records are inserted into the store.

The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and can change as new record versions are inserted into the store (and validTo changes as a result).

Old record versions are stored in segment stores according to their validTo timestamps. The use of segments here is analogous to that in the existing RocksDB implementation for windowed stores. Because records are stored in segments based on their validTo timestamps, this means that entire segments can be expired at a time once the records contained in the segment are no longer relevant based on the store's history retention. (A difference between the versioned store segments implementation and that of windowed stores today is that for versioned stores all segments will share the same physical RocksDB instance, in contrast to windowed stores where each segment is its own RocksDB, to allow for many more segments than windowed stores use today.)

...

Code Block
package org.apache.kafka.streams.state;

/**
 * A representation of a versioned key-value store as a {@link KeyValueStore} of type <Bytes, byte[]>.
 * See {@link VersionedBytesStoreSupplier} for more.
 */
public interface VersionedBytesStore extends KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {

    /**
     * The analog of {@link VersionedKeyValueStore#get(Object, long)}.
     */
    byte[] get(Bytes key, long timestampToasOfTimestamp);
}

Internally, this interface will be used to assist in the representation of VersionedKeyValueStore<Bytes, byte[]>  as KeyValueStore<Bytes, byte[]> .

...

Versioned Store Interface

History retention and get(key,

...

asOfTimestamp)

In the event that get(key, timestampToasOfTimestamp) is called with a timestamp bound older than the specified history retention, instead of returning null (and logging a warning) as proposed above, other design options include (1) throwing an exception or (2) updating the return type from ValueAndTimestamp<V> to Optional<ValueAndTimestamp<V>> and returning an empty optional to indicate that the timestamp bound was invalid. The first option is not very user-friendly. The second option complicates the interface and diverges the return types of get(key) and get(key, timestampToasOfTimestamp) .

Additional return timestamps from get(key,

...

asOfTimestamp)

The proposed return type from get(key, timestampToasOfTimestamp) of ValueAndTimestamp<V> represents returning the record value and timestamp (i.e., validFrom timestamp) found for the given key (and timestamp bound). In some situations, it may be useful for users to additionally have the validTo timestamp associated with the record. In order to provide this additional timestamp to users, the return type of get(key, timestampToasOfTimestamp) could instead be a new type, e.g., VersionedRecord , which has a value and two associated timestamps. This again complicates the interface and diverges the return types of get(key)  and get(key, timestampToasOfTimestamp) . (It's not meaningful to return a validTo timestamp associated with records returned from get(key) since the validTo timestamp will always be a sentinel value that signals that the record is the latest record associated with the given key.)

...

In the event that get(key) or get(key, timestampToasOfTimestamp) finds that the latest record version associated with a particular key (and possible timestamp bound) is a tombstone, rather than returning null the versioned store could instead return a non-null ValueAndTimestamp with null value (and relevant timestamp). This would allow users to distinguish between the key not being found in the store at all (null ValueAndTimestamp ) versus the key being found with a tombstone for the latest record (non-null ValueAndTimestamp with null value). This proposal was rejected since the use cases for making such a distinction are limited.

...