Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

The main goal is supporting to support interactive queries in the presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and KIP-969968
For this KIP, the following query types are considered to be implemented.  :

Range Queries

  1. key-range latest-value query
  2. key-range with lower bound latest-value query
  3. key-range with upper bound latest-value query
  4. all-keys (no bound) latest-value query
  5. key-range query with timestamp (upper) bound
  6. key-range with lower bound with timestamp (upper) bound 
  7. key-range with upper bound with timestamp (upper) bound
  8. all-keys (no bound) with timestamp (upper) bound
  9. key-range query with timestamp range
  10. key-range query with lower bound with timestamp range
  11. key-range query with upper bound with timestamp range
  12. all-keys (no bound) with timestamp range
  13. key-range query all-versions
  14. key-range query with lower bound all-versions
  15. key-range query with upper bond all-versions
  16. all-keys query (no bound) all-versions (entire store)

...

  • The methods are composable. The fromfromTime and asOftoTime methods specify the time range while the withLowerKeyBound and withUpperKeyBound methods specify the key bounds. 
    • If a user applies the same time limit multiple times such as MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t1).from(t2), then the last one wins (it will be translated to MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t2)).

    • Defining a query with time range (empty, t1] will be translated into [0, t1]
    • Defining a query with time range (t1, empty) will be translated into [t1, MAX)
    • A query with no specified time range will be translated into [0, MAX). It means that the query will return all the versions of all the records with specified key range.
  • As explained in the javadocs, the query returns all valid records within the specified time range.
    • The fromTimestampfromTime specifies the starting point. There can be records which have been inserted before the fromTimestampfromTime and are valid in the time range. 
    • The asOfTimestamptoTime specifies the ending point. Records that have been inserted at asOfTimestamptoTime are returned by the query as well.
  • The overall order of the returned records is by Key. The method orderByTimestamp() can make the overall order by timestamp.
    • The order for both key and timestamp is by default ascending. They can be changed by the methods withDescendingKeys() and withDescendingTimestamps() respectively.
  • No ordering is guaranteed for the return records.
Code Block
languagejava
title
Code Block
languagejava
titleMultiVersionedRangeQuery
linenumberstrue
package org.apache.kafka.streams.query;


/**
 * Interactive query for retrieving a set of records with keys within a specified key range and time
 * range.
 */

@Evolving
public final class MultiVersionedRangeQuery<K, V> implements
    Query<KeyValueIterator<K, VersionedRecord<V>>> {

  private final Optional<K> lower;
  private final Optional<K> upper;
  private final Optional<Instant> fromTimestampfromTime;

  private final Optional<Instant> asOfTimestamptoTime;

  private MultiVersionedRangeQuery(
 final boolean isKeyAscending;
  private final booleanOptional<K> isTimeAscending;
lower,
     private final booleanOptional<K> isOrderedByKey;upper,

  private MultiVersionedRangeQuery(
      final Optional<K>Optional<Instant> lowerfromTime,
      final Optional<Instant> Optional<K>toTime) upper,{
    this.lower  final Optional<Instant> fromTimestamp,= lower;
    this.upper  final Optional<Instant> asOfTimestamp,= upper;
    this.fromTime = finalfromTime;
 boolean isOrderedByKey,
  this.toTime = toTime;
  final}

 boolean isKeyAscending,
 /** 
   * finalInteractive booleanrange isTimeAscending)query {
using a lower and this.lowerupper =bound lower;
to filter the keys thisreturned.upper =* upper;
For each 
  this.fromTimestamp =* fromTimestamp;
key the records valid this.asOfTimestampwithin =the asOfTimestamp;
specified time range are thisreturned.isOrderedByKey =* isOrderedByKey;
In case the time this.isKeyAscendingrange =is isKeyAscending;
   * not specified just this.isTimeAscendingthe =latest isTimeAscending;
record for }

each key is /**returned. 
   * Interactive@param rangelower queryThe usingkey athat lowerspecifies andthe upperlower bound toof the filterrange the
 keys returned. * For@param eachupper 
The key that * keyspecifies the recordsupper validbound withinof the specifiedrange time range
 are returned. * In@param case<K> theThe timekey range istype 
   * not@param specified<V> justThe thevalue latesttype record
 for each key is returned. */ 
   *public @paramstatic lower<K, TheV> keyMultiVersionedRangeQuery<K, thatV> specifieswithKeyRange(final theK lower, boundfinal of the range K upper);


   /**
 * @param upper* TheInteractive keyrange thatquery specifiesusing thea upperlower bound to offilter the rangekeys 
returned.   * @paramFor <K> Theeach key type the
   * records @paramvalid <V>within Thethe valuespecified typetime 
range are returned. */ 
In case  public static <K, V> MultiVersionedRangeQuery<K, V> withKeyRange(final K lower, final K upper);


    /**the time range is not
   * specified just the latest record for each key is returned.
   * Interactive range query using a@param lower The key that specifies the lower bound toof filter the keys returned. * For each key therange
   * records@param valid<K> within the specifiedThe timekey rangetype
 are returned. * In@param case<V> the time rangeThe isvalue nottype
   */
 specified justpublic thestatic latest<K, recordV> forMultiVersionedRangeQuery<K, eachV> keywithLowerKeyBound(final is returned.K lower);

  /**
 * @param lower* TheInteractive keyrange thatquery specifiesusing thea lower bound to offilter the range
keys  returned. * @paramFor <K>  each The key typethe
   * @paramrecords <V>valid within the Thespecified valuetime type
range are returned. */
 In publiccase staticthe <K,time V>range MultiVersionedRangeQuery<K,is V> withLowerKeyBound(final K lower);

not
   /**
 specified just *the Interactivelatest rangerecord queryfor usingeach akey loweris boundreturned.
 to filter the* keys@param returned.upper *The Forkey eachthat keyspecifies the
 lower bound * records valid within the specified time range are returned.of the range
   * In@param case<K> the time rangeThe iskey nottype
   * specified@param just<V> the latest recordThe forvalue eachtype
 key is returned.*/
  public *static @param<K, upperV> TheMultiVersionedRangeQuery<K, keyV> thatwithUpperKeyBound(final specifies the lower bound of the rangeK upper);

  /**
   * Interactive @paramscan <K>query that returns Theall keyrecords type
in the store. * @paramFor <V>each key the Therecords value typevalid
   */
 within the publicspecified statictime <K,range V>are MultiVersionedRangeQuery<K,returned. V>* withUpperKeyBound(finalIn K upper);

  /**
   * Interactive scan query that returns all records in the store. * For each key the records validcase the time range is not specified just
   * the latest record for each key is returned.
   * within@param the<K> specifiedThe timekey rangetype
 are returned. * In@param case<V> theThe time range is not specified justvalue type
   */
  public *static the<K, latestV> recordMultiVersionedRangeQuery<K, for each key is returned.
   * @param <K> The key type
   * @param <V> The value type
   */
  public static <K, V> MultiVersionedRangeQuery<K, V> allKeysV> allKeys();

  /**
   * Specifies the starting time point for the key query. The range query returns all the records
   * that are valid in the time range starting from the timestamp {@code fromTimestampfromTime}.
   * @param fromTimestampfromTime The starting time point
   */
  public MultiVersionedRangeQuery<K, V> fromfromTime(Instant fromTimestampfromTime);

  /**
   * Specifies the ending time point for the key query. The range query returns all the records that
   * have timestamp <= {@code asOfTimestamptoTime}.
   * @param asOfTimestamptoTime The ending time point
   */
  public MultiVersionedRangeQuery<K, V> asOftoTime(Instant asOfTimestamptoTime);


  /**
   * SpecifiesThe thelower overallbound order of returnedthe recordsquery, byif timestampspecified.
   */
  public MultiVersionedRangeQuery<K, V> orderByTimestampOptional<K> lowerKeyBound();

  /**
   * SpecifiesThe theupper orderbound of the keysquery, asif descending.specified
   */
  public MultiVersionedRangeQuery<K, V> withDescendingKeysOptional<K> upperKeyBound();

  /**    
   * The Specifiesstarting thetime orderpoint of the timestampsquery, asif descending.specified
    */
  public VersionedRangeQuery<K, V> withDescendingTimestampsOptional<Instant> fromTime();

  /**
   * The ending lowertime boundpoint of the query, if specified.
   */
  public Optional<K>Optional<Instant> lowerKeyBoundtoTime();

}
  /**
   * The upper bound of the query, if specified
   */
  public Optional<K> upperKeyBound();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> fromTimestamp();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> asOfTimestamp();

  /**
   * @return true if the query orders the returned records by key 
   */
  public boolean isOrderedByKey();

  /**
   * @return true if the query returns records in ascending order of keys
   */
  public boolean isKeyAscending();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isRangeAscending();

}

Another get method is added to the VersionedKeyValueStore interface.



Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

Imagine we have the following records 

put(1, 1, time=2023-01-01T10:00:00.00Z)

put(1, null, time=2023-01-05T10:00:00.00Z)

put(2, 20, time=2023-01-10T10:00:00.00Z)

put(3, 30, time=2023-01-12T10:00:00.00Z)

put(1, 2, time=2023-01-15T10:00:00.00Z)

put(1, 3, time=2023-01-20T10:00:00.00Z)

put(2, 30, time=2023-01-25T10:00:00.00Z)

Code Block
languagejava
linenumberstrue
// example 1: MultiVersionedRangeQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedRangeQuery<Integer, Integer> query1 =
        MultiVersionedRangeQuery.withKeyRange(1, 2);
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request1 =
        inStore("my_store").withQuery(query1);
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult1 = kafkaStreams.query(request1);
 
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
Code Block
languagejava
firstline1
titleVersionedKeyValueStore
linenumberstrue
package org.apache.kafka.streams.state;
 
public interface VersionedKeyValueStore<K, V> extends StateStore {
      /**
     * Get the record associated with this key as of the specified timestamp (i.e.,
     * the existing record with the largest timestamp not exceeding the provided
     * timestamp bound).
     *
     * @param lowerKeyBound           The key that specifies the lower key bound of the range
     * @param upperKeyBound           The key that specifies the upper key bound of the range
 
     * @param fromTimestamp The timestamp lower bound. The records that have been inserted at
                            or before this timestamp and did not become tombstone at or before
                   while (iterator.hasNext()) {
       this timestamp will befinal retrievedKeyValue<Integer, andVersionedRecord<Integer>> returned.
record   = iterator.next();
  * @param asOfTimestamp The timestamp bound. This bound isInteger inclusive; if akey = record.key;
     *     Integer value = record.value.value();
          Long timestamp =  (for the specified key) exists with this timestamp, then
     *                      this is the record that will be returned.record.value.timestamp();
		  Long validTo = record.value.validTo();
	      System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));  	
        }
     }
}
/* the @returnprinted Theoutput valuewill andbe
 timestamp (along with the validTokey,value: 1,1, timestamp) of the records with keys within the specified key range: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z  
     *         as of the provided timestamp, or {@code null} if no such record exists
     *         (including if the provided timestamp bound is older than this store's history
     *         retention time, i.e., the store no longer contains data for the provided
     *         timestamp).
     * @throws NullPointerException       If null is used for lowerKeyBound or upperKeyBound.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    VersionedRecord<V> get(K lowerKeyBound, K upperKeyBound, long fromTimestamp, long asOfTimestamp);
}

Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

Code Block
languagejava
linenumberstrue
final MultiVersionedRangeQuery<Integer, Integer> query =
key,value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z 
    key,value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now 
    key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z 
    key,value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now 
*/

// example 2: The value of the records with key range (1,2) from 2023-01-17 Time: 10:00:00.00Z till 2023-01-30 T10:00:00.00Z


MultiVersionedRangeQuery<Integer, Integer> query2 = MultiVersionedRangeQuery.withKeyRange(1, 2);
query2 = query2.fromTime(Instant.parse(2023-01-17T10:00:00.00Z)).toTime(Instant.parse(2023-01-30T10:00:00.00Z));
 
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request2 =
        inStore("my_store").withQuery(query2);
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult2 = kafkaStreams.query(request2);
 
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults2 = versionedRangeResult2.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          MultiVersionedRangeQuery.withKeyRange(1, 2).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z));
 
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request =
Integer key = record.key;
          Integer value  inStore("my_store").withQuery(query= record.value.value();
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request);
 
// Get the resultsLong fromtimestamp all= partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          Integer key = record.key;
          Integer value = record.value.value();
          Long timestamp = record.value.timestamp();
        }
     }
}record.value.timestamp();
 		  Long validTo = record.value.validTo();
	      System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));           
		}
     }
}

/* the printed output will be   
    key, value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now 
    key, value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now
    key, value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z
    key, value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z 
*/


Compatibility, Deprecation, and Migration Plan

...

The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries). Moreover , there will be unit tests where ever needed. 

Rejected Alternatives

The initial plan was to provide ordering based on key and/or timestamp, which is removed from the KIP and may be provided by subsequent KIPs based on user demand.