Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

In this KIP

  • we propose a public classes, MultiVersionedKeyQuery.
  • Moreover, the public interface ValueIterator is added to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp). 
  • In addition, a new method is added to the VersionedKeyValueStore interface to support single-key_multi-timestamp queries.

Proposed Changes

To be able to list the tombstones, the VersionedRecord class can accept NULL values as well.

...

Code Block
languagejava
firstline1
titleMultiVersionedKeyQuery.java
linenumberstrue
package org.apache.kafka.streams.query;

/**
 * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range.
 */
@Evolving
public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> {

  private final K key;
  private final Optional<Instant> fromTimestamp;
  private final Optional<Instant> asOfTimestamp;
  private final boolean isAscending;

  private MultiVersionedKeyQuery(
      final K key,
      Optional<Instant> fromTimestamp,
      Optional<Instant> asOfTimestamp,
      boolean isAscending) {
    this.key = Objects.requireNonNull(key);
    this.fromTimestamp = fromTimestamp;
    this.asOfTimestamp = asOfTimestamp;
    this.isAscending = isAscending;
  }

  /**
   * Creates a query that will retrieve the set of records identified by {@code key} if any exists
   * (or {@code null} otherwise).
   * @param key The key to retrieve
   * @throws NullPointerException if @param key is null           
   * @param <K> The type of the key
   * @param <V> The type of the value that will be retrieved
   */
  public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key);

  /**
   * Specifies the starting time point for the key query.
   * <pre>
   * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTimestamp}. There can be records which have been inserted before the {@code fromTimestamp} 
   * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT became tombstone at or after {@code fromTimestamp}.  
   * </pre>  
   * @param fromTimestamp The starting time point
   * @throws NullPointerException if @param fromTimestamp is null  
   */
  public MultiVersionedKeyQuery<K, V> from(Instant fromTimestamp);

  /**
   * Specifies the ending time point for the key query.
   * The key query returns all the records that have timestamp <= {@code asOfTimestamp}.
   * @param asOfTimestamp The ending time point
   * @throws NullPointerException if @param asOfTimestamp is null  
  */
  public MultiVersionedKeyQuery<K, V> asOf(Instant asOfTimestamp);

  /**
   * Specifies the order of the returned records by the query as descending by timestamp.
   */
  public MultiVersionedKeyQuery<K, V> withDescendingTimestamps();

  /**
   * The key that was specified for this query.
   */
  public K key();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> fromTimestamp();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> asOfTimestamp();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isAscending ();
}


VersionedKeyValueStore interface

Code Block
languagejava
firstline1
titleVersionedKeyValueStore.java
linenumberstrue
collapsetrue
package org.apache.kafka.streams.state;
     /**
     * Get the record associated with this key as of the specified timestamp (i.e.,
     * the existing record with the largest timestamp not exceeding the provided
     * timestamp bound).
     *
     * @param key           The key to fetch
     * @param asOfTimestamp The timestamp bound. This bound is inclusive; if a record
     *                      (for the specified key) exists with this timestamp, then
     *                      this is the record that will be returned.
     * @return The value and timestamp of the record associated with this key
     *         as of the provided timestamp, or {@code null} if no such record exists
     *         (including if the provided timestamp bound is older than this store's history
     *         retention time, i.e., the store no longer contains data for the provided
     *         timestamp). Note that the record timestamp {@code r.timestamp()} of the
     *         returned {@link VersionedRecord} may be smaller than the provided timestamp
     *         bound. Additionally, if the latest record version for the key is eligible
     *         for the provided timestamp bound, then that record will be returned even if
     *         the timestamp bound is older than the store's history retention.
     * @throws NullPointerException       If null is used for key.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    VersionedRecord<V> get(K key, long asOfTimestamp);

Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

...

  • Since this is a completely new set of APIs, no backward compatibility concerns are anticipated. 
  • Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.

Test Plan

The single-key_multi-timestamp interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned key queries).