Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The methods are composable. Therefore, the meaningless combinations such as withKey(k1).asOf(t1).allVersions() end up throwing a RunTimeException (for example NotSupportedException).
    • Defining a query with time range (empty, t1] will be translated into [0, t1]
    • Defining a query with time range (t1, empty) will be translated into [t1, MAX)
    • A query with no specified time range will be interpreted as single-key_single-timestamp that returns the record with the latest timestamp.
  • As explained in the javadocs, the query returns all valid records within the specified time range.
    • The fromTimestamp specifies the starting point. There can be records which have been inserted before the fromTimestamp and are valid in the time range. 
    • The asOfTimestamp specifies the ending point. Records that have been inserted at asOfTimestamp are returned by the query as well.
  • The order of the returned records is by default Ascending by timestamp. The method withDescendingTimestamps() can reverse the order.
Code Block
languagejava
firstline1
titleVersionedKeyQuery.java
linenumberstrue
package org.apache.kafka.streams.query;

/**
 * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range.
 */
@Evolving
public final class VersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> {

  private final K key;
  private final Optional<Instant> fromTimestamp;
  private final Optional<Instant> asOfTimestamp;
  private final boolean isAscending;

  private VersionedKeyQuery(
      final K key,
      Optional<Instant> fromTimestamp,
      Optional<Instant> asOfTimestamp,
      boolean isAscending) {
    this.key = Objects.requireNonNull(key);
    this.fromTimestamp = fromTimestamp;
    this.asOfTimestamp = asOfTimestamp;
    this.isAscending = isAscending;
  }

  /**
   * Creates a query that will retrieve the set of records identified by {@code key} if any exists
   * (or {@code null} otherwise).
   * @param key The key to retrieve
   * @param <K> The type of the key
   * @param <V> The type of the value that will be retrieved
   */
  public static <K, V> VersionedKeyQuery<K, V> withKey(final K key);

  /**
   * Specifies the starting time point for the key query.
   * The key query returns all the records that are valid in the time range starting from the timestamp {@code fromTimestamp}.
   * @param fromTimestamp The starting time point
   */
  public VersionedKeyQuery<K, V> from(Instant fromTimestamp);

  /**
   * Specifies the ending time point for the key query.
   * The key query returns all the records that have timestamp <= {@code asOfTimestamp}.
   * @param asOfTimestamp The ending time point
   */
  public VersionedKeyQuery<K, V> asOf(Instant asOfTimestamp);

  /**
   * Specifies the starting and ending points of the key query as MIN and MAX respectively.
   * Therefore, the query returns all the existing records in the state store with the specified key.
   * @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already
   * specified.
   */
  public VersionedKeyQuery<K, V> allVersions();

  /**
   * Specifies the order of the returned records by the query as descending by timestamp.
   */
  public VersionedKeyQuery<K, V> withDescendingTimestamps();

  /**
   * The key that was specified for this query.
   */
  public K getKey();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> getFromTimestamp();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> getAsOfTimestamp();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isAscending ();
}

...