Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: clarify behavior of timestamped get in a specific edge case

...

Code Block
package org.apache.kafka.streams.state;  

/**
 * A key-value store that stores multiple record versions per key, and supports timestamp-based
 * retrieval operations to return the latest record (per key) as of a specified timestamp.
 * Only one record is stored per key and timestamp, i.e., a second call to
 * {@link #put(Object, Object, long)} with the same key and timestamp will replace the first.
 * <p>
 * Each store instance has an associated, fixed-duration "history retention" which specifies
 * how long old record versions should be kept for. In particular, a versioned store guarantees
 * to return accurate results for calls to {@link #get(Object, long)} where the provided timestamp
 * bound is within history retention of the current observed stream time. (Queries with timestamp
 * bound older than the specified history retention are considered invalid.)
 * <p>
 * The store's "history retention" also doubles as its "grace period," which determines how far
 * back in time writes to the store will be accepted. A versioned store will not accept writes
 * (inserts, updates, or deletions) if the timestamp associated with the write is older than the
 * current observed stream time by more than the grace period.
 *
 * @param <K> The key type
 * @param <V> The value type
 */ 
public interface VersionedKeyValueStore<K, V> extends StateStore {

    /**
     * Add a new record version associated with this key.
     * <p>
     * If the timestamp associated with the new record version is older than the store's
     * grace period (i.e., history retention) relative to the current observed stream time,
     * then the record will not be added.
     *
     * @param key       The key
     * @param value     The value, it can be {@code null};
     *                  if the serialized bytes are also {@code null} it is interpreted as a delete
     * @param timestamp The timestamp for this record version
     * @throws NullPointerException If {@code null} is used for key.
     */
    void put(K key, V value, long timestamp); 

     /**
     * Delete the value associated with this key from the store, at the specified timestamp
     * (if there is such a value), and return the deleted value.
     * <p>
     * This operation is semantically equivalent to {@link #get(Object, long)} #get(key, timestamp))}
     * followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}.
     * <p>
     * If the timestamp associated with this deletion is older than the store's grace period
     * (i.e., history retention) relative to the current observed stream time, then the deletion
     * will not be performed.
 and    *
     * @param key {@code null} will be returned.
     * <p>
     * TheThis key
operation is semantically equivalent to * @param timestamp The timestamp for this delete{@link #get(Object, long) #get(key, timestamp)}
     * @returnfollowed Theby value and timestamp of the latest record associated with this key{@link #put(Object, Object, long) #put(key, null, timestamp)}, with
     * a caveat that the return value is always {@code asnull} ofif the deletion timestamp
 (inclusive),  or {@code null}* ifis anyolder of
than the store's grace  *    period (i.e., history retention), regardless of
     (1)* thewhat store contains no records for this {@link #get(Object, long) #get(key, (2timestamp)} the latest recordwould return.
     *
     *    for this @param key as of the deletion timestamp is a tombstone, orThe key
     * @param timestamp The timestamp for this delete
  (3) the deletion timestamp* is@return olderThe thanvalue thisand store'stimestamp historyof retention
the record associated with this *key as of
     *  (i.e., this store no longer contains data for the provideddeletion timestamp (inclusive).
     * @throws NullPointerException If , or {@code null} if isno usedsuch forrecord key.exists
     */
     VersionedRecord<V> delete(K key, long timestamp);

  (including  /**
     * Get the latest (by timestamp) record associated with this key.
if the deletion timestamp is older than this store's history
     *         *
retention time, i.e., the store *no @paramlonger keycontains Thedata keyfor tothe fetchprovided
     *    @return  The value and timestamp). Note ofthat the latest record associatedtimestamp with this key, or{@code r.timestamp()} of the
     *         returned {@code@link nullVersionedRecord} ifmay either (1)be smaller than the store contains no records for this key or (2) theprovided deletion
     *         latest record for this key is a tombstone.timestamp.
     VersionedRecord<V> delete(K key, long timestamp);

     /**
 @throws NullPointerException   * Get the latest If null is used for(by timestamp) record associated with this key.
     *
 @throws    InvalidStateStoreException* if@param thekey storeThe iskey notto initializedfetch
     */
 @return The  VersionedRecord<V> get(K key);

    /**value and timestamp of the latest record associated with this key, or
     * Get  the latest record associated with this key{@code withnull} timestampif noteither exceeding(1) the specified
store contains no records for *this timestampkey bound.
     *or (2) the
     * @param key        latest record for Thethis key to fetchis a tombstone.
     * @param@throws asOfTimestampNullPointerException The timestamp bound. This bound is inclusive; if a record If null is used for key.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    VersionedRecord<V> get(K key);

   (for /**
     * Get the specifiedlatest key)record existsassociated with this timestamp, then key with timestamp not exceeding the specified
     * timestamp bound.
     *
     * @param key        this is the recordThe thatkey will be returned.to fetch
     * @param @returnasOfTimestamp The valuetimestamp andbound. timestampThis ofbound theis latestinclusive; recordif associateda with thisrecord key
     *         satisfying the provided timestamp bound, or {@code null} if any of
   (for the *specified key) exists with this timestamp, then 
  (1) the store contains* no records for this key, (2) the latest record
     *         for this key satisfyingis the providedrecord timestampthat boundwill is a tombstone, orbe returned.
     * @return The value and timestamp of the record (3) associated with this key
     *         as of the provided timestamp, or {@code null} if no such record exists
     *         (including if the provided timestamp bound is older than this store's history retention
     *         retention time, (i.e., thisthe store no longer contains data for the provided
 timestamp bound).
     * @throws NullPointerException       Iftimestamp). nullNote isthat used the record timestamp {@code r.timestamp()} of the
     *         returned {@link VersionedRecord} may be smaller than the provided timestamp
     *         bound.
     * @throws NullPointerException       If null is used for key.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    VersionedRecord<V> get(K key, long asOfTimestamp);
}

...

Each store has an associated, fixed-duration history retention which specifies how long old record versions should be kept for. In particular, a versioned store guarantees to return accurate results for calls to get(key, asOfTimestamp) where the provided timestamp bound is within history retention of the current observed stream time. (If the timestamp bound is outside the specified history retention, then a warning is record is still returned if the latest record version for the key satisfies the timestamp bound. Otherwise, a warning is logged and null is returned.)

...

In the event that get(key, asOfTimestamp) is called with a timestamp bound older than the specified history retention, instead of returning null (and logging a warning) as proposed above, other design options include (1) throwing an exception or (2) updating the return type from VersionedRecord<V> to Optional<VersionedRecord<V>> and returning an empty optional to indicate that the timestamp bound was invalid. The first option is not very user-friendly. The second option complicates the interface and diverges the return types of get(key) and get(key, asOfTimestamp) .

Regarding the edge case where get(key, asOfTimestamp) is called with a timestamp bound older than the specified history retention but the latest record version for the key satisfies the timestamp bound, the proposal above says that the latest record version should be returned in this case, rather than rejecting the timestamped query and returning null. Returning null in this case, i.e., strict enforcement of the store's history retention, could be valid as well but the latest record version in this case would still be accessible from get(key) , so it's more user-friendly to have the store return the value from get(key, asOfTimestamp) rather than requiring the user to determine whether to call get(key) or get(key, timestamp) to account for this edge case.

ValueAndTimestamp as return type of get(key, asOfTimestamp) / Additional return timestamps from get(key, asOfTimestamp)

...