Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The methods are composable. The from and asOf methods specify the time range.
    • If a user applies the same time limit multiple times such as MultiVersionedKeyQuery.withKey(k).from(t1).from(t2), then the last one wins (it will be translated to MultiVersionedKeyQuery.withKey(k).from(t2)).

    • Defining a query with time range (empty, t1] will be translated into [0, t1] (calling only the from method).
    • Defining a query with time range (t1, empty) will be translated into [t1, MAX) (calling only the asOf method).
    • A query with no specified time range will be translated into [0, MAX). It means that the query will return all the versions of the records with specified key.
  • As explained in the javadocs, the query returns all valid records within the specified time range.
    • The fromTimestamp fromTime specifies the starting point. There can be records which have been inserted before the fromTimestamp fromTime and are valid in the time range. For example, if the record (k,v) has been inserted at time=0, it will be returned by the multi versioned key queries with key=k and fromTimestamp>fromTime>=0. Obviously, if the record (k,v) becomes tombstone at time=2, then the multi versioned key queries with key=k and fromTimestamp>fromTime>=2 will not return it any more. In this case, the multi versioned key queries with key=k and fromTimestamp<2 fromTime<2 will return the record (k,v) validTo=2.
    • The asOfTimestamp toTime specifies the ending point. Records that have been inserted at asOfTimestamp toTime are returned by the query as well.
  • The order of the returned records is by default ascending by timestamp. The method withDescendingTimestamps() can reverse the order. Btw, withAscendingTimestamps() method can be used for code readability purpose. 

Code Block
languagejava
firstline1
titleMultiVersionedKeyQuery.java
linenumberstrue
package org.apache.kafka.streams.query;

/**
 * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range.
 */
@Evolving
public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> {

  private final K key;
  private final Optional<Instant> fromTimestampfromTime;
  private final Optional<Instant> asOfTimestamptoTime;
  private final boolean isAscending;

  private MultiVersionedKeyQuery(
      final K key,
      Optional<Instant> fromTimestampfromTime,
      Optional<Instant> asOfTimestamptoTime,
      boolean isAscending) {
    this.key = Objects.requireNonNull(key);
    this.fromTimestampfromTime = fromTimestampfromTime;
    this.asOfTimestamptoTime = asOfTimestamptoTime;
    this.isAscending = isAscending;
  }

  /**
   * Creates a query that will retrieve the set of records identified by {@code key} if any exists
   * (or {@code null} otherwise).
   * @param key The key to retrieve
   * @throws NullPointerException if @param key is null           
   * @param <K> The type of the key
   * @param <V> The type of the value that will be retrieved
   */
  public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key);

  /**
   * Specifies the starting time point for the key query.
   * <pre>
   * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTimestampfromTime}. There can be records which have been inserted before the {@code fromTimestampfromTime} 
   * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT became tombstone at or after {@code fromTimestampfromTime}.  
   * </pre>  
   * @param fromTimestampfromTime The starting time point
   * @throws NullPointerException if @param fromTimestampfromTime is null  
   */
  public MultiVersionedKeyQuery<K, V> fromfromTime(Instant fromTimestampfromTime);

  /**
   * Specifies the ending time point for the key query.
   * The key query returns all the records that have timestamp <= {@code asOfTimestamptoTime}.
   * @param asOfTimestamptoTime The ending time point
   * @throws NullPointerException if @param asOfTimestamptoTime is null  
  */
  public MultiVersionedKeyQuery<K, V> asOftoTime(Instant asOfTimestamptoTime);

  /**
   * Specifies the order of the returned records by the query as descending by timestamp.
   */
  public MultiVersionedKeyQuery<K, V> withDescendingTimestamps();

  /**
   * Specifies the order of the returned records by the query as ascending by timestamp.
   */
  public MultiVersionedKeyQuery<K, V> withAscendingTimestamps();

   /**
   * The key that was specified for this query.
   */
  public K key();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> fromTimestampfromTime();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> asOfTimestamptoTime();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isAscending ();
}

...

Code Block
languagejava
firstline1
titleVersionedKeyValueStore.java
linenumberstrue
package org.apache.kafka.streams.state;

public interface VersionedKeyValueStore<K, V> extends StateStore {
      /**
     * Get the record associated with this key as of the specified timestamp (i.e.,
     * the existing record with the largest timestamp not exceeding the provided
     * timestamp bound).
     *
     * @param key           The key to fetch
     * @param fromTimestampfromTime The timestamp lower bound. The records that have been inserted at 
							or before this timestamp and did not become tombstone at or before 
							this timestamp will be retrieved and returned.
     * @param asOfTimestamptoTime The timestamp bound. This bound is inclusive; if a record
     *                      (for the specified key) exists with this timestamp, then
     *                      this is the record that will be returned.
     * @return The value and timestamp (along with the validTo timestamp) of the record associated with this key
     *         as of the provided timestamp, or {@code null} if no such record exists
     *         (including if the provided timestamp bound is older than this store's history
     *         retention time, i.e., the store no longer contains data for the provided
     *         timestamp). 
     * @throws NullPointerException       If null is used for key.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    VersionedRecord<V> get(K key, long fromTimestampfromTime, long asOfTimestamptoTime);
}

Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Imagine we have the following records 

...

Code Block
languagejava
linenumberstrue
// example 1: MultiVersionedKeyQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedKeyQuery<Integer, Integer> query1 = MultiVersionedKeyQuery.withKey(1);

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request1 =
        inStore("my_store").withQuery(query1);

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult1 = kafkaStreams.query(request1);

// Get the results from all partitions
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults1 = versionedKeyResult1.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults1.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final VersionedRecord<Integer> record = iterator.next();
          Long timestamp = record.timestamp();
          Long validTo = record.validTo();
          Integer value = record.value();	
		  System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));	
        }
     }
}  
/* the printed output will be
	value: 1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z  
	value: 2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z  
	value: 3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now  
*/

// example 2: The value of the record with key=1 from 2023-01-17 Time: 10:00:00.00Z till 2023-01-25 T10:00:00.00Z
 
MultiVersionedKeyQuery<Integer, Integer> query2 = MultiVersionedKeyQuery.withKey(1);
query2 = query2.fromfromTime(Instant.parse("2023-01-17T10:00:00.00Z")).asOftoTime(Instant.parse("2023-01-25T10:00:00.00Z"))

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request2 =
        inStore("my_store").withQuery(query2);

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult2 = kafkaStreams.query(request2);

// Get the results from all partitions
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults2 = versionedKeyResult2.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults2.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final VersionedRecord<Integer> record = iterator.next();
          Long timestamp = record.timestamp();
          Long validTo = record.validTo();
          Integer value = record.value();	
		  System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));	
        }
     }
}  
/* the printed output will be   	
	value: 2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z  
	value: 3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now    
*/  

...