Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleMultiVersionedRangeQuery
linenumberstrue
package org.apache.kafka.streams.query;


/**
 * Interactive query for retrieving a set of records with keys within a specified key range and time
 * range.
 */

@Evolving
public final class MultiVersionedRangeQuery<K, V> implements
    Query<KeyValueIterator<K, VersionedRecord<V>>> {

  private final Optional<K> lower;
  private final Optional<K> upper;
  private final Optional<Instant> fromTime;
  private final Optional<Instant> toTime;

  private MultiVersionedRangeQuery(
      final Optional<K> lower,
      final Optional<K> upper,
      final Optional<Instant> fromTime,
      final Optional<Instant> toTime) {
    this.lower = lower;
    this.upper = upper;
    this.fromTime = fromTime;
    this.toTime = toTime;
  }

   /** 
   * Interactive range query using a lower and upper bound to filter the keys returned. * For each 
   * key the records valid within the specified time range are returned. * In case the time range is 
   * not specified just the latest record for each key is returned. 
   * @param lower The key that specifies the lower bound of the range 
   * @param upper The key that specifies the upper bound of the range 
   * @param <K> The key type 
   * @param <V> The value type 
   */ 
   public static <K, V> MultiVersionedRangeQuery<K, V> withKeyRange(final K lower, final K upper);


   /**
   * Interactive range query using a lower bound to filter the keys returned. * For each key the
   * records valid within the specified time range are returned. * In case the time range is not
   * specified just the latest record for each key is returned.
   * @param lower The key that specifies the lower bound of the range
   * @param <K>   The key type
   * @param <V>   The value type
   */
  public static <K, V> MultiVersionedRangeQuery<K, V> withLowerKeyBound(final K lower);

  /**
   * Interactive range query using a lower bound to filter the keys returned. * For each key the
   * records valid within the specified time range are returned. * In case the time range is not
   * specified just the latest record for each key is returned.
   * @param upper The key that specifies the lower bound of the range
   * @param <K>   The key type
   * @param <V>   The value type
   */
  public static <K, V> MultiVersionedRangeQuery<K, V> withUpperKeyBound(final K upper);

  /**
   * Interactive scan query that returns all records in the store. * For each key the records valid
   * within the specified time range are returned. * In case the time range is not specified just
   * the latest record for each key is returned.
   * @param <K> The key type
   * @param <V> The value type
   */
  public static <K, V> MultiVersionedRangeQuery<K, V> allKeys();

  /**
   * Specifies the starting time point for the key query. The range query returns all the records
   * that are valid in the time range starting from the timestamp {@code fromTime}.
   * @param fromTime The starting time point
   */
  public MultiVersionedRangeQuery<K, V> fromTime(Instant fromTime);

  /**
   * Specifies the ending time point for the key query. The range query returns all the records that
   * have timestamp <= {@code toTime}.
   * @param toTime The ending time point
   */
  public MultiVersionedRangeQuery<K, V> toTime(Instant toTime);

  /**
   * The lower bound of the query, if specified.
   */
  public Optional<K> lowerKeyBound();

  /**
   * The upper bound of the query, if specified
   */
  public Optional<K> upperKeyBound();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> fromTime();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> toTime();

}

Another get method is added to the VersionedKeyValueStore interface.

Code Block
languagejava
firstline1
titleVersionedKeyValueStore
linenumberstrue
package org.apache.kafka.streams.state;
 
public interface VersionedKeyValueStore<K, V> extends StateStore {
      /**
     * Get the record associated with this key as of the specified timestamp (i.e.,
     * the existing record with the largest timestamp not exceeding the provided
     * timestamp bound).
     *
     * @param lowerKeyBound           The key that specifies the lower key bound of the range
     * @param upperKeyBound           The key that specifies the upper key bound of the range
 
     * @param fromTime The timestamp lower bound. The records that have been inserted at
                            or before this timestamp and did not become tombstone at or before
                            this timestamp will be retrieved and returned.
     * @param toTime The timestamp bound. This bound is inclusive; if a record
     *                      (for the specified key) exists with this timestamp, then
     *                      this is the record that will be returned.
     * @return The value and timestamp (along with the validTo timestamp) of the records with keys within the specified key range
     *         as of the provided timestamp, or {@code null} if no such record exists
     *         (including if the provided timestamp bound is older than this store's history
     *         retention time, i.e., the store no longer contains data for the provided
     *         timestamp).
     * @throws NullPointerException       If null is used for lowerKeyBound or upperKeyBound.
     * @throws InvalidStateStoreException if the store is not initialized
     */     
     KeyValueIterator<K, VersionedRecord<V>> get(K lowerKeyBound, K upperKeyBound, long fromTime, long toTime);
}


Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

...

The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries). Moreover , there will be unit tests where ever needed. 

Rejected Alternatives

The initial plan was to provide ordering based on key and/or timestamp, which is removed from the KIP and may be provided by subsequent KIPs based on user demand.

...