Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleMultiVersionedRangeQuery
linenumberstrue
package org.apache.kafka.streams.query;


/**
 * Interactive query for retrieving a set of records with keys within a specified key range and time
 * range.
 */

@Evolving
public final class MultiVersionedRangeQuery<K, V> implements
    Query<KeyValueIterator<K, VersionedRecord<V>>> {

  private final Optional<K> lower;
  private final Optional<K> upper;
  private final Optional<Instant> fromTimestamp;

  private final Optional<Instant> asOfTimestamp;
  private final boolean isKeyAscending;
  private final boolean isTimeAscending;
  private final boolean isOrderedByKey;

  private MultiVersionedRangeQuery(
      final Optional<K> lower,
      final Optional<K> upper,
      final Optional<Instant> fromTimestamp,
      final Optional<Instant> asOfTimestamp,
      final boolean isOrderedByKey,
      final boolean isKeyAscending,
      final boolean isTimeAscending) {
    this.lower = lower;
    this.upper = upper;
    this.fromTimestamp = fromTimestamp;
    this.asOfTimestamp = asOfTimestamp;
    this.isOrderedByKey = isOrderedByKey;
    this.isKeyAscending = isKeyAscending;
    this.isTimeAscending = isTimeAscending;
  }

  /**
   * Interactive range query using a lower and upper bound to filter the keys returned. * For each
   * key the records valid within the specified time range are returned. * In case the time range is
   * not specified just the latest record for each key is returned.
   * @param lower The key that specifies the lower bound of the range
   * @param upper The key that specifies the upper bound of the range
   * @param <K>   The key type
   * @param <V>   The value type
   */
  public static <K, V> MultiVersionedRangeQuery<K, V> withRange(final K lower, final K upper);

  /**
   * Interactive range query using a lower bound to filter the keys returned. * For each key the
   * records valid within the specified time range are returned. * In case the time range is not
   * specified just the latest record for each key is returned.
   * @param lower The key that specifies the lower bound of the range
   * @param <K>   The key type
   * @param <V>   The value type
   */
  public static <K, V> MultiVersionedRangeQuery<K, V> withLowerBound(final K lower);

  /**
   * Interactive range query using a lower bound to filter the keys returned. * For each key the
   * records valid within the specified time range are returned. * In case the time range is not
   * specified just the latest record for each key is returned.
   * @param upper The key that specifies the lower bound of the range
   * @param <K>   The key type
   * @param <V>   The value type
   */
  public static <K, V> MultiVersionedRangeQuery<K, V> withUpperBound(final K upper);

  /**
   * Interactive scan query that returns all records in the store. * For each key the records valid
   * within the specified time range are returned. * In case the time range is not specified just
   * the latest record for each key is returned.
   * @param <K> The key type
   * @param <V> The value type
   */
  public static <K, V> MultiVersionedRangeQuery<K, V> withNoBounds();

  /**
   * Specifies the starting time point for the key query. The range query returns all the records
   * that are valid in the time range starting from the timestamp {@code fromTimestamp}.
   * @param fromTimestamp The starting time point
   */
  public MultiVersionedRangeQuery<K, V> from(Instant fromTimestamp);

  /**
   * Specifies the ending time point for the key query. The range query returns all the records that
   * have timestamp <= {@code asOfTimestamp}.
   * @param asOfTimestamp The ending time point
   */
  public MultiVersionedRangeQuery<K, V> asOf(Instant asOfTimestamp);

  /**
   * Specifies the starting and ending points of the range query as MIN and MAX respectively.
   * Therefore, the query returns all the existing records in the state store with keys within the
   * specified key range.
   * @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already
   *                          specified.
   */
   public MultiVersionedRangeQuery<K, V> allVersions();

  /**
   * Specifies the overall order of returned records by timestamp
   */
  public MultiVersionedRangeQuery<K, V> orderByTimestamp();

  /**
   * Specifies the order of keys as descending.
   */
  public MultiVersionedRangeQuery<K, V> withDescendingKeys();

  /**    
   * Specifies the order of the timestamps as descending.
   */
  public VersionedRangeQuery<K, V> withDescendingTimestamps();

  /**
   * The lower bound of the query, if specified.
   */
  public Optional<K> getLowerKeyBoundlowerKeyBound();

  /**
   * The upper bound of the query, if specified
   */
  public Optional<K> getUpperKeyBoundupperKeyBound();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> getFromTimestampfromTimestamp();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> getAsOfTimestampasOfTimestamp();

  /**
   * @return true if the query orders the returned records by key 
   */
  public boolean isOrderedByKey();

  /**
   * @return true if the query returns records in ascending order of keys
   */
  public boolean isKeyAscending();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isRangeAscending();

}

...

  • Since this is a completely new set of APIs, no backward compatibility concerns are anticipated. 
  • Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.

Test Plan

The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries).