Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The methods are composable. Therefore, the meaningless combinations such as withKey(k1).asOf(t1).allVersions() end up throwing a RunTimeException (for example NotSupportedException).
    • Defining a query with time range (empty, t1] will be translated into [0, t1]
    • Defining a query with time range (t1, empty) will be translated into [t1, MAX)
    • A query with no specified time range will be interpreted as single-key_single-timestamp that returns the record with the latest timestamp.
  • As explained in the javadocs, the query returns all valid records within the specified time range.
    • The fromTimestamp specifies the starting point. There can be records which have been inserted before the fromTimestamp and are valid in the time range.  For example, if the record (k,v) has been inserted at time=0, it will be returned by the multi versioned key queries with key=k and fromTimestamp>=0. Obviously, if the record (k,v) becomes tombstone at time=2, then the multi versioned key queries with key=k and fromTimestamp>=2 will not return it any more. In this case, the multi versioned key queries with key=k and fromTimestamp<2 will return the record (k,v) validTo=2.
    • The asOfTimestamp specifies the ending point. Records that have been inserted at asOfTimestamp are returned by the query as well.
  • The order of the returned records is by default ascending by timestamp. The method withDescendingTimestamps() can reverse the order.
Code Block
languagejava
firstline1
titleMultiVersionedKeyQuery.java
linenumberstrue
package org.apache.kafka.streams.query;

/**
 * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range.
 */
@Evolving
public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> {

  private final K key;
  private final Optional<Instant> fromTimestamp;
  private final Optional<Instant> asOfTimestamp;
  private final boolean isAscending;

  private MultiVersionedKeyQuery(
      final K key,
      Optional<Instant> fromTimestamp,
      Optional<Instant> asOfTimestamp,
      boolean isAscending) {
    this.key = Objects.requireNonNull(key);
    this.fromTimestamp = fromTimestamp;
    this.asOfTimestamp = asOfTimestamp;
    this.isAscending = isAscending;
  }

  /**
   * Creates a query that will retrieve the set of records identified by {@code key} if any exists
   * (or {@code null} otherwise).
   * @param key The key to retrieve
   * @throws NullPointerException if @param key is null           
   * @param <K> The type of the key
   * @param <V> The type of the value that will be retrieved
   */
  public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key);

  /**
   * Specifies the starting time point for the key query.
   * <pre>
   * The key query returns all the records that are still validexisting in the time range starting from the timestamp {@code fromTimestamp}. There can be records which have been inserted before the {@code fromTimestamp} 
   * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT became tombstone at or after {@code fromTimestamp}.  
   * </pre>  
   * @param fromTimestamp The starting time point
   * @throws NullPointerException if @param fromTimestamp is null  
   */
  public MultiVersionedKeyQuery<K, V> from(Instant fromTimestamp);

  /**
   * Specifies the ending time point for the key query.
   * The key query returns all the records that have timestamp <= {@code asOfTimestamp}.
   * @param asOfTimestamp The ending time point
   * @throws NullPointerException if @param asOfTimestamp is null  
  */
  public MultiVersionedKeyQuery<K, V> asOf(Instant asOfTimestamp);

  /**
   * Specifies the starting and ending points of the key query as MIN and MAX respectively.
   * Therefore, the query returns all the existing records in the state store with the specified key.
   * @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already
   * specified.
   */
  public MultiVersionedKeyQuery<K, V> allVersions();

  /**
   * Specifies the order of the returned records by the query as descending by timestamp.
   */
  public MultiVersionedKeyQuery<K, V> withDescendingTimestamps();

  /**
   * The key that was specified for this query.
   */
  public K key();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> fromTimestamp();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> asOfTimestamp();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isAscending ();
}

...