Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For supporting range queries, VersionedRangeQuery class is used.

  • The methods are composable. Therefore, the meaningless combinations such as withRange(k1, k2).asOf(t1).allVersions() end up throwing a RunTimeException (for example NotSupportedException).
    • Defining a query with time range (empty, t1] will be translated into [0, t1]
    • Defining a query with time range (t1, empty) will be translated into [t1, MAX)
    • A query with no specified time range will be interpreted as a normal range query that returns the records with the latest timestamp.
  • As explained in the javadocs, the query returns all valid records within the specified time range.
    • The fromTimestamp specifies the starting point. There can be records which have been inserted before the fromTimestamp and are valid in the time range. 
    • The asOfTimestamp specifies the ending point. Records that have been inserted at asOfTimestamp are returned by the query as well.
  • The overall order of the returned records is by Key. The method orderByTimestamp() can make the overall order by timestamp.
    • The order for both key and timestamp is by default ascending. They can be changed by the methods withDescendingKeys() and withDescendingTimestamps() respectively.
Code Block
languagejava
titleVersionedRangeQuery
linenumberstrue
package org.apache.kafka.streams.query;


/**
 * Interactive query for retrieving a set of records with keys within a specified key range and time
 * range.
 */

@Evolving
public final class VersionedRangeQuery<K, V> implements
    Query<KeyValueIterator<K, VersionedRecord<V>>> {

  private final Optional<K> lower;
  private final Optional<K> upper;
  private final Optional<Instant> fromTimestamp;

  private final Optional<Instant> asOfTimestamp;
  private final boolean isKeyAscending;
  private final boolean isTimeAscending;
  private final boolean isOrderedByKey;

  private VersionedRangeQuery(
      final Optional<K> lower,
      final Optional<K> upper,
      final Optional<Instant> fromTimestamp,
      final Optional<Instant> asOfTimestamp,
      final boolean isOrderedByKey,
      final boolean isKeyAscending,
      final boolean isTimeAscending) {
    this.lower = lower;
    this.upper = upper;
    this.fromTimestamp = fromTimestamp;
    this.asOfTimestamp = asOfTimestamp;
    this.isOrderedByKey = isOrderedByKey;
    this.isKeyAscending = isKeyAscending;
    this.isTimeAscending = isTimeAscending;
  }

  /**
   * Interactive range query using a lower and upper bound to filter the keys returned. * For each
   * key the records valid within the specified time range are returned. * In case the time range is
   * not specified just the latest record for each key is returned.
   * @param lower The key that specifies the lower bound of the range
   * @param upper The key that specifies the upper bound of the range
   * @param <K>   The key type
   * @param <V>   The value type
   */
  public static <K, V> VersionedRangeQuery<K, V> withRange(final K lower, final K upper);

  /**
   * Interactive range query using a lower bound to filter the keys returned. * For each key the
   * records valid within the specified time range are returned. * In case the time range is not
   * specified just the latest record for each key is returned.
   * @param lower The key that specifies the lower bound of the range
   * @param <K>   The key type
   * @param <V>   The value type
   */
  public static <K, V> VersionedRangeQuery<K, V> withLowerBound(final K lower);

  /**
   * Interactive range query using a lower bound to filter the keys returned. * For each key the
   * records valid within the specified time range are returned. * In case the time range is not
   * specified just the latest record for each key is returned.
   * @param upper The key that specifies the lower bound of the range
   * @param <K>   The key type
   * @param <V>   The value type
   */
  public static <K, V> VersionedRangeQuery<K, V> withUpperBound(final K upper);

  /**
   * Interactive scan query that returns all records in the store. * For each key the records valid
   * within the specified time range are returned. * In case the time range is not specified just
   * the latest record for each key is returned.
   * @param <K> The key type
   * @param <V> The value type
   */
  public static <K, V> VersionedRangeQuery<K, V> withNoBounds();

  /**
   * Specifies the starting time point for the key query. The range query returns all the records
   * that are valid in the time range starting from the timestamp {@code fromTimestamp}.
   * @param fromTimestamp The starting time point
   */
  public VersionedRangeQuery<K, V> from(Instant fromTimestamp);

  /**
   * Specifies the ending time point for the key query. The range query returns all the records that
   * have timestamp <= {@code asOfTimestamp}.
   * @param asOfTimestamp The ending time point
   */
  public VersionedRangeQuery<K, V> asOf(Instant asOfTimestamp);

  /**
   * Specifies the starting and ending points of the range query as MIN and MAX respectively.
   * Therefore, the query returns all the existing records in the state store with keys within the
   * specified key range.
   * @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already
   *                          specified.
   */
   public VersionedRangeQuery<K, V> allVersions();

  /**
   * Specifies the overall order of returned records by timestamp
   */
  public VersionedRangeQuery<K, V> orderByTimestamp();

  /**
   * Specifies the order of keys as descending.
   */
  public VersionedRangeQuery<K, V> withDescendingKeys();

  /**    
   * Specifies the order of the timestamps as descending.
   */
  public VersionedRangeQuery<K, V> withDescendingTimestamps();

  /**
   * The lower bound of the query, if specified.
   */
  public Optional<K> getLowerKeyBound();

  /**
   * The upper bound of the query, if specified
   */
  public Optional<K> getUpperKeyBound();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> getFromTimestamp();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> getAsOfTimestamp();

  /**
   * @return true if the query orders the returned records by key 
   */
  public boolean isOrderedByKey();

  /**
   * @return true if the query returns records in ascending order of keys
   */
  public boolean isKeyAscending();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isRangeAscending();

}

...