Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The methods are composable. The fromTime(Instant fromTime) and toTime(Instant toTime) methods specify the time range.
    • If a user applies the same time limit multiple times such as MultiVersionedKeyQuery.withKey(k).fromTime(t1).fromTime(t2), then the last one wins (it will be translated to MultiVersionedKeyQuery.withKey(k).fromTime(t2)).

    • Defining a query with time range (empty, t1] will be translated into [0, t1] (calling only the toTime(t1) method).
    • Defining a query with time range [t1, empty) will be translated into [t1, MAX) (calling only the fromTime(t1) method).
    • A query with no specified time range will be translated into [0, MAX). It means that the query will return all the versions of the records with specified key.
  • As explained in the javadocs, the query returns all valid records within the specified time range.
    • The fromTime specifies the starting point. There can be records which have been inserted before the fromTime and are valid in the time range. For example, if the record (k,v) has been inserted at time=0, it will be returned by the multi versioned key queries with key=k and fromTime>=0. Obviously, if the record (k,v) becomes tombstone at time=2, then the multi versioned key queries with key=k and fromTime>=2 will not return it any more. In this case, the multi versioned key queries with key=k and fromTime<2 will return the record (k,v) validTo=2.
    • The toTime specifies the ending point. Records that have been inserted at toTime are returned by the query as well.
  • No ordering is guaranteed for the results, but the results can be sorted by timestamp (ascendingly in ascending or descendinglydescending order) by calling the corresponding defined methods (withAscendingTimestamps() and withDescendingTimestamps() respectively). 

Code Block
languagejava
firstline1
titleMultiVersionedKeyQuery.java
linenumberstrue
package org.apache.kafka.streams.query;  

/**
 * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range.
 * No ordering is guaranteed for the results, but the results can be sorted by timestamp (ascendinglyin ascending or descendinglydescending order) by calling the corresponding defined methods.
 *
 *  @param <K> The type of the key.
 *  @param <V> The type of the result returned by this query.
 */

@Evolving
public final class MultiVersionedKeyQuery<K, V> implements Query<VersionedRecordIterator<V>> {

  private final K key;
  private final Optional<Instant> fromTime;
  private final Optional<Instant> toTime;
  private final boolean isAscending;

  private MultiVersionedKeyQuery(
      final K key,
      final Optional<Instant> fromTime,
      final Optional<Instant> toTime,
      final boolean isAscending) {
    this.key = Objects.requireNonNull(key);
    this.fromTime = fromTime;
    this.toTime = toTime;
    this.isAscending = isAscending;
  }     


  /**
   * Creates a query that will retrieve the set of records identified by {@code key} if any exists
   * (or {@code null} otherwise).
   *
   * <p>
   * While the query by default returns the all the record versions of the specified {@code key}, setting
   * the {@code fromTimestamp} (by calling the {@link #fromTime(Instant)} method), and the {@code toTimestamp}
   * (by calling the {@link #toTime(Instant)} method) makes the query to return the record versions associated 
   * to the specified time range.
   *
   * @param key The specified key by the query
   * @param <K> The type of the key
   * @param <V> The type of the value that will be retrieved
   * @throws NullPointerException if @param key is null
   */   

  public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key);

  /**
   * Specifies the starting time point for the key query.
   * <p>
   * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can
   * be records which have been inserted before the {@code fromTime} and are still valid in the query specified time range (the whole time range
   * or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}.  
   *
   * @param fromTime The starting time point
   * If {@code fromTime} is null, will be considered as negative infinity, ie, no lower bound
   */
  public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime);

  /**
   * Specifies the ending time point for the key query.
   * The key query returns all the records that have timestamp <= toTime.
   * 
   * @param toTime The ending time point
   * If @param toTime is null, will be considered as positive infinity, ie, no upper bound
   */
  public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime);

  /**
   * Specifies the order of the returned records by the query as descending by timestamp.
   */
  public MultiVersionedKeyQuery<K, V> withDescendingTimestamps();

  /**
   * Specifies the order of the returned records by the query as ascending by timestamp.
   */
  public MultiVersionedKeyQuery<K, V> withAscendingTimestamps();

   /**
   * The key that was specified for this query.
   * The specified {@code key} of the query.
   */
  public K key();

  /**
   * The starting time point of the query, if specified
   * @return The specified {@code fromTime} of the query. 
   */
  public Optional<Instant> fromTime();

  /**
   * The ending time point of the query, if specified
   * @return The specified {@code toTime} of the query. 
   */
  public Optional<Instant> toTime();

  /**
  * The order of the returned records by timestamp. 
  * @return true if the query returns records in ascending order of timestamps
  */
  public boolean isAscending ();
}

...