Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

The main goal is supporting to support interactive queries in the presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and KIP-969968
For this KIP, the following query types are considered to be implemented.  :

Range Queries

  1. key-range latest-value query
  2. key-range with lower bound latest-value query
  3. key-range with upper bound latest-value query
  4. all-keys (no bound) latest-value query
  5. key-range query with timestamp (upper) bound
  6. key-range with lower bound with timestamp (upper) bound 
  7. key-range with upper bound with timestamp (upper) bound
  8. all-keys (no bound) with timestamp (upper) bound
  9. key-range query with timestamp range
  10. key-range query with lower bound with timestamp range
  11. key-range query with upper bound with timestamp range
  12. all-keys (no bound) with timestamp range
  13. key-range query all-versions
  14. key-range query with lower bound all-versions
  15. key-range query with upper bond all-versions
  16. all-keys query (no bound) all-versions (entire store)

...

  • The methods are composable. The fromfromTime and asOftoTime methods specify the time range while the withLowerKeyBound and withUpperKeyBound methods specify the key bounds. 
    • If a user applies the same time limit multiple times such as MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t1).from(t2), then the last one wins (it will be translated to MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t2)).

    • Defining a query with time range (empty, t1] will be translated into [0, t1]
    • Defining a query with time range (t1, empty) will be translated into [t1, MAX)
    • A query with no specified time range will be translated into [0, MAX). It means that the query will return all the versions of all the records with specified key range.
  • As explained in the javadocs, the query returns all valid records within the specified time range.
    • The fromTime specifies the starting point. There can be records which have been inserted before the fromTime and are valid in the time range. 
    • The toTime specifies the ending point. Records that have been inserted at toTime are returned by the query as well.
  • The overall order of the returned records is by Key. The method orderByTimestamp() can make the overall order by timestamp.
    • The order for both key and timestamp is by default ascending. They can be changed by the methods withDescendingKeys() and withDescendingTimestamps() respectively.
  • No ordering is guaranteed for the return records.
Code Block
languagejava
titleMultiVersionedRangeQuery
linenumbers
Code Block
languagejava
titleMultiVersionedRangeQuery
linenumberstrue
package org.apache.kafka.streams.query;


/**
 * Interactive query for retrieving a set of records with keys within a specified key range and time
 * range.
 */

@Evolving
public final class MultiVersionedRangeQuery<K, V> implements
    Query<KeyValueIterator<K, VersionedRecord<V>>> {

  private final Optional<K> lower;
  private final Optional<K> upper;
  private final Optional<Instant> fromTime;
  private final Optional<Instant> toTime;

  private MultiVersionedRangeQuery(
 final boolean isKeyAscending;
  private final booleanOptional<K> isTimeAscending;lower,
  private final boolean isOrderedByKey;

 final privateOptional<K> MultiVersionedRangeQuery(upper,
      final Optional<K>Optional<Instant> lowerfromTime,
      final Optional<Instant> Optional<K>toTime) upper,{
    this.lower = finallower;
 Optional<Instant> fromTime,
  this.upper = upper;
  final Optional<Instant> toTime,
   this.fromTime = fromTime;
    finalthis.toTime boolean= isOrderedByKey,toTime;
  }

   /** final
 boolean isKeyAscending,
      final boolean isTimeAscending) {
    this.lower = lower;
    this.upper = upper;
    this.fromTime = fromTime;
    this.toTime = toTime;
    this.isOrderedByKey = isOrderedByKey;
    this.isKeyAscending = isKeyAscending;
    this.isTimeAscending = isTimeAscending;
  }

   /** 
   * Interactive range query using a lower and upper bound to filter the keys returned. * For each 
   * key the records valid within the specified time range are returned. * In case the time range is  * Interactive range query using a lower and upper bound to filter the keys returned. * For each 
   * key the records valid within the specified time range are returned. * In case the time range is 
   * not specified just the latest record for each key is returned. 
   * @param lower The key that specifies the lower bound of the range 
   * @param upper The key that specifies the upper bound of the range 
   * @param <K> The key type 
   * not@param specified<V> justThe thevalue latesttype record
 for each key is returned. */ 
   *public @paramstatic lower<K, TheV> keyMultiVersionedRangeQuery<K, thatV> specifieswithKeyRange(final theK lower, boundfinal of the range K upper);


   /**
   * @paramInteractive upperrange The key that specifies the upper bound of the range query using a lower bound to filter the keys returned. * For each key the
   * records @paramvalid <K>within Thethe keyspecified typetime 
range are returned. * In @paramcase <V>the Thetime valuerange typeis not
   */ 
specified just the publiclatest staticrecord <K,for V>each MultiVersionedRangeQuery<K,key V> withKeyRange(final K lower, final K upper);


   /**
   * Interactive range query using ais returned.
   * @param lower The key that specifies the lower bound toof filterthe therange
 keys returned. * For each@param <K>   The key thetype
   * records@param valid<V> within the specifiedThe timevalue rangetype
 are returned. * In case the time range is not
   * specified just the latest record for each key is returned.
   * @param lower The key that specifies the lower bound of the range/
  public static <K, V> MultiVersionedRangeQuery<K, V> withLowerKeyBound(final K lower);

  /**
   * Interactive range query using a lower bound to filter the keys returned. * For each key the
   * @paramrecords <K>valid within the Thespecified keytime type
range are returned. * @paramIn <V>case the time Therange valueis typenot
   */
 specified publicjust staticthe <K,latest V>record MultiVersionedRangeQuery<K,for V>each withLowerKeyBound(finalkey K lower);

is returned.
   /**
 @param upper *The Interactivekey rangethat queryspecifies usingthe a lower bound toof filter the keys returned. * For each key therange
   * records@param valid<K> within the specifiedThe timekey rangetype
 are returned. * In@param case<V> the time rangeThe isvalue nottype
   */
 specified justpublic thestatic latest<K, recordV> forMultiVersionedRangeQuery<K, eachV> key is returned.withUpperKeyBound(final K upper);

  /**
   * @paramInteractive upperscan The keyquery that specifiesreturns theall lowerrecords bound ofin the range
  store. * @paramFor <K>each key the Therecords key typevalid
   * @paramwithin <V>the specified time Therange value type
  are returned. */
 In publiccase staticthe <K,time V>range MultiVersionedRangeQuery<K,is V>not withUpperKeyBound(final K upper);

  /**specified just
   * Interactivethe scanlatest queryrecord thatfor returnseach allkey recordsis inreturned.
 the store. * For@param each<K> The key the records validtype
   * within@param the<V> specifiedThe timevalue rangetype
 are returned. */
  Inpublic casestatic the<K, timeV> rangeMultiVersionedRangeQuery<K, is not specified justV> allKeys();

  /**
   * Specifies the lateststarting time recordpoint for eachthe key is returned.query. The range query returns all the records
   * @param <K> The key type that are valid in the time range starting from the timestamp {@code fromTime}.
   * @param <V>fromTime The starting valuetime typepoint
   */
  public static <K, V> MultiVersionedRangeQuery<K, V> allKeysfromTime(Instant fromTime);

  /**
   * Specifies the startingending time point for the key query. The range query returns all the records that
   * thathave are valid in the time range starting from the timestamptimestamp <= {@code fromTimetoTime}.
   * @param fromTimetoTime The startingending time point
   */
  public MultiVersionedRangeQuery<K, V> fromtoTime(Instant fromTimetoTime);

  /**
   * SpecifiesThe thelower ending time point forbound of the key query., The range query returns all the records that
   * have timestamp <= {@code toTime}.
   * @param toTime The ending time pointif specified.
   */
  public Optional<K> lowerKeyBound();

  /**
   * The upper bound of the query, if specified
   */
  public MultiVersionedRangeQuery<K, V> asOf(Instant toTime);
Optional<K> upperKeyBound();

  /**
   * SpecifiesThe thestarting overalltime orderpoint of returnedthe recordsquery, byif timestampspecified
   */
  public MultiVersionedRangeQuery<K, V> orderByTimestampOptional<Instant> fromTime();

  /**
   * SpecifiesThe ending thetime orderpoint of the keysquery, asif descending.specified
   */
  public MultiVersionedRangeQuery<K, V> withDescendingKeysOptional<Instant> toTime();


  /**    
   * Specifies the order of the timestamps as descending.
   */
  public VersionedRangeQuery<K, V> withDescendingTimestamps();

  /**
   * The lower bound of the query, if specified.
   */
  public Optional<K> lowerKeyBound();

  /**
   * The upper bound of the query, if specified
   */
  public Optional<K> upperKeyBound();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> fromTime();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> toTime();

  /**
   * @return true if the query orders the returned records by key 
   */
  public boolean isOrderedByKey();

  /**
   * @return true if the query returns records in ascending order of keys
   */
  public boolean isKeyAscending();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isTimeAscending();

}

Another get method is added to the VersionedKeyValueStore interface.

}


Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

Imagine we have the following records 

put(1, 1, time=2023-01-01T10:00:00.00Z)

put(1, null, time=2023-01-05T10:00:00.00Z)

put(2, 20, time=2023-01-10T10:00:00.00Z)

put(3, 30, time=2023-01-12T10:00:00.00Z)

put(1, 2, time=2023-01-15T10:00:00.00Z)

put(1, 3, time=2023-01-20T10:00:00.00Z)

put(2, 30, time=2023-01-25T10:00:00.00Z)

Code Block
languagejava
linenumberstrue
// example 1: MultiVersionedRangeQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedRangeQuery<Integer, Integer> query1 =
        MultiVersionedRangeQuery.withKeyRange(1, 2);
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request1 =
        inStore("my_store").withQuery(query1);
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult1 = kafkaStreams.query(request1);
 
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
Code Block
languagejava
firstline1
titleVersionedKeyValueStore
linenumberstrue
package org.apache.kafka.streams.state;
 
public interface VersionedKeyValueStore<K, V> extends StateStore {
      /**
     * Get the record associated with this key as of the specified timestamp (i.e.,
     * the existing record with the largest timestamp not exceeding the provided
     * timestamp bound).
     *
     * @param lowerKeyBound           The key that specifies the lower key bound of the range
     * @param upperKeyBound           The key that specifies the upper key bound of the range
 
     * @param fromTime The timestamp lower bound. The records that have been inserted at
                            or before this timestamp and did not become tombstone at or before
                   while (iterator.hasNext()) {
       this timestamp will befinal retrievedKeyValue<Integer, andVersionedRecord<Integer>> returned.
record   = iterator.next();
  * @param toTime The timestamp bound. This bound isInteger inclusive; if akey = record.key;
     *     Integer value = record.value.value();
          Long timestamp =  (for the specified key) exists with this timestamp, then
     *                      this is the record that will be returned.record.value.timestamp();
		  Long validTo = record.value.validTo();
	      System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));  	
        }
     }
}
/* the @returnprinted Theoutput valuewill andbe
 timestamp (along with the validTokey,value: 1,1, timestamp) of the records with keys within the specified key range: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z  
     *         as of the provided timestamp, or {@code null} if no such record exists
     *         (including if the provided timestamp bound is older than this store's history
     *         retention time, i.e., the store no longer contains data for the provided
     *         timestamp).
     * @throws NullPointerException       If null is used for lowerKeyBound or upperKeyBound.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    VersionedRecord<V> get(K lowerKeyBound, K upperKeyBound, long fromTime, long toTime);
}

Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

Code Block
languagejava
linenumberstrue
final MultiVersionedRangeQuery<Integer, Integer> query =
key,value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z 
    key,value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now 
    key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z 
    key,value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now 
*/

// example 2: The value of the records with key range (1,2) from 2023-01-17 Time: 10:00:00.00Z till 2023-01-30 T10:00:00.00Z


MultiVersionedRangeQuery<Integer, Integer> query2 = MultiVersionedRangeQuery.withKeyRange(1, 2);
query2 = query2.fromTime(Instant.parse(2023-01-17T10:00:00.00Z)).toTime(Instant.parse(2023-01-30T10:00:00.00Z));
 
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request2 =
        inStore("my_store").withQuery(query2);
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult2 = kafkaStreams.query(request2);
 
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults2 = versionedRangeResult2.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          MultiVersionedRangeQuery.withKeyRange(1, 2).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z));
 
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request =
Integer key = record.key;
          Integer value  inStore("my_store").withQuery(query= record.value.value();
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request);
 
// Get the resultsLong fromtimestamp all= partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          Integer key = record.key;
          Integer value = record.value.value();
          Long timestamp = record.value.timestamp();
        }
     }
}record.value.timestamp();
 		  Long validTo = record.value.validTo();
	      System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));           
		}
     }
}

/* the printed output will be   
    key, value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now 
    key, value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now
    key, value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z
    key, value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z 
*/


Compatibility, Deprecation, and Migration Plan

...

The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries). Moreover , there will be unit tests where ever needed. 

Rejected Alternatives

The initial plan was to provide ordering based on key and/or timestamp, which is removed from the KIP and may be provided by subsequent KIPs based on user demand.