Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread:   here

JIRA: KAFKA-15348 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The main goal is supporting to support interactive queries in the presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and KIP-969968
For this KIP, the following query types are considered to be implemented.  :

Range Queries

  1. key-range latest-value query
  2. key-range with lower bound latest-value query
  3. key-range with upper bound latest-value query
  4. all-keys (no bound) latest-value query
  5. key-range query with timestamp (upper) bound
  6. key-range with lower bound with timestamp (upper) bound 
  7. key-range with upper bound with timestamp (upper) bound
  8. all-keys (no bound) with timestamp (upper) bound
  9. key-range query with timestamp range
  10. key-range query with lower bound with timestamp range
  11. key-range query with upper bound with timestamp range
  12. all-keys (no bound) with timestamp range
  13. key-range query all-versions
  14. key-range query with lower bound all-versions
  15. key-range query with upper bond all-versions
  16. all-keys query (no bound) all-versions (entire store)

...

In this KIP we propose the public classes, MultiVersionedRangeQuery that will be described in the next section. More over a method will be added to the VersionedKeyValueStore interface.

Proposed Changes

For supporting range queries, MultiVersionedRangeQuery class is used.

  • The methods are composable. The fromfromTime and asOftoTime methods specify the time range while the withLowerKeyBound and withUpperKeyBound methods specify the key bounds. 
    • If a user applies the same time limit multiple times such as MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t1).from(t2), then the last one wins (it will be translated to MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t2)).

    • Defining a query with time range (empty, t1] will be translated into [0, t1]
    • Defining a query with time range (t1, empty) will be translated into [t1, MAX)
    • A query with no specified time range will be translated into [0, MAX). It means that the query will return all the versions of all the records with specified key range.
  • As explained in the javadocs, the query returns all valid records within the specified time range.
    • The fromTimestampfromTime specifies the starting point. There can be records which have been inserted before the fromTimestampfromTime and are valid in the time range. 
    • The asOfTimestamptoTime specifies the ending point. Records that have been inserted at asOfTimestamptoTime are returned by the query as well.
  • The overall order of the returned records is by Key. The method orderByTimestamp() can make the overall order by timestamp.
    • The order for both key and timestamp is by default ascending. They can be changed by the methods withDescendingKeys() and withDescendingTimestamps() respectively.
  • No ordering is guaranteed for the return records.
Code Block
language
Code Block
languagejava
titleMultiVersionedRangeQuery
linenumberstrue
package org.apache.kafka.streams.query;


/**
 * Interactive query for retrieving a set of records with keys within a specified key range and time
 * range.
 */

@Evolving
public final class MultiVersionedRangeQuery<K, V> implements
    Query<KeyValueIterator<K, VersionedRecord<V>>> {

  private final Optional<K> lower;
  private final Optional<K> upper;
  private final Optional<Instant> fromTimestampfromTime;

  private final Optional<Instant> asOfTimestamptoTime;

  private MultiVersionedRangeQuery(
 final boolean isKeyAscending;
  private final boolean isTimeAscending;
  private final boolean isOrderedByKey;

  private MultiVersionedRangeQuery(
      final Optional<K> lower,
      final Optional<K> upper,
      final Optional<Instant> fromTimestampfromTime,
      final Optional<Instant> toTime) asOfTimestamp,{
    this.lower = finallower;
 boolean isOrderedByKey,
  this.upper =   final boolean isKeyAscending,
      final boolean isTimeAscending) {
    this.lower = lowerupper;
    this.fromTime = fromTime;
    this.uppertoTime = uppertoTime;
  }

  this.fromTimestamp = fromTimestamp; /** 
   * this.asOfTimestampInteractive =range asOfTimestamp;
query using a  this.isOrderedByKey = isOrderedByKey;
    this.isKeyAscending = isKeyAscending;lower and upper bound to filter the keys returned. * For each 
   * this.isTimeAscendingkey =the isTimeAscending;
records valid }

within the specified /**time 
range are returned. * InteractiveIn rangecase query using a lower and upper bound to filter the keysthe time range is 
   * not specified just the latest record for each key is returned. *
 For each 
* @param lower *The key that specifies the recordslower validbound withinof the specifiedrange time
 range are returned. * In@param upper caseThe thekey timethat rangespecifies isthe 
upper bound of *the notrange specified
 just the latest* record@param for<K> eachThe key is returned.type 
   * @param lower<V> The keyvalue thattype specifies
 the lower bound*/ of
 the range 
public   * @param upper The key that specifies the upper bound of the range static <K, V> MultiVersionedRangeQuery<K, V> withKeyRange(final K lower, final K upper);


   /**
   * @paramInteractive <K>range Thequery keyusing typea 
lower bound to *filter @paramthe <V> The value type keys returned. * For each key the
   */ 
records valid within publicthe staticspecified <K,time V>range MultiVersionedRangeQuery<K,are V> withKeyRange(final K lower, final K upper);


    /**returned. * In case the time range is not
   * Interactivespecified rangejust querythe usinglatest arecord lowerfor boundeach tokey filteris thereturned.
 keys returned. * @param Forlower eachThe key the
that specifies the *lower recordsbound validof within the specified time range
 are returned. * In@param case<K> the time rangeThe iskey nottype
   * specified@param just<V> the latest recordThe forvalue eachtype
 key is returned.*/
  public *static @param<K, lowerV> TheMultiVersionedRangeQuery<K, keyV> thatwithLowerKeyBound(final specifiesK the lower);

 bound of the range /**
   * @paramInteractive <K>range query using Thea keylower type
bound to filter *the @paramkeys <V>returned. * For Theeach valuekey typethe
   */
 records publicvalid staticwithin <K,the V>specified MultiVersionedRangeQuery<K,time V>range withLowerKeyBound(final K lower);

  /**
   * Interactive range query using a lower bound to filter the keys returned. * For each key theare returned. * In case the time range is not
   * specified just the latest record for each key is returned.
   * records@param validupper withinThe thekey specifiedthat timespecifies rangethe arelower returned. * In casebound of the time range is not
   * specified@param just<K> the latest record for eachThe key is returned.type
   * @param upper<V> The key thatThe specifiesvalue thetype
 lower bound of the range */
  public *static @param<K, <K>V> MultiVersionedRangeQuery<K,  The key type
   * @param <V>   The value type
   */
  public static <K, V> MultiVersionedRangeQuery<K, V> V> withUpperKeyBound(final K upper);

  /**
   * Interactive scan query that returns all records in the store. * For each key the records valid
   * within the specified time range are returned. * In case the time range is not specified just
   * the latest record for each key is returned.
   * @param <K> The key type
   * @param <V> The value type
   */
  public static <K, V> MultiVersionedRangeQuery<K, V> allKeys();

  /**
   * Specifies the starting time point for the key query. The range query returns all the records
   * that are valid in the time range starting from the timestamp {@code fromTimestampfromTime}.
   * @param fromTimestampfromTime The starting time point
   */
  public MultiVersionedRangeQuery<K, V> fromfromTime(Instant fromTimestampfromTime);

  /**
   * Specifies the ending time point for the key query. The range query returns all the records that
   * have timestamp <= {@code asOfTimestamptoTime}.
   * @param asOfTimestamptoTime The ending time point
   */
  public MultiVersionedRangeQuery<K, V> asOftoTime(Instant asOfTimestamptoTime);


  /**
   * SpecifiesThe thelower overall orderbound of returnedthe recordsquery, byif timestampspecified.
   */
  public MultiVersionedRangeQuery<K, V> orderByTimestampOptional<K> lowerKeyBound();

  /**
   * SpecifiesThe theupper orderbound of the keysquery, asif descending.specified
   */
  public MultiVersionedRangeQuery<K, V> withDescendingKeysOptional<K> upperKeyBound();

  /**
   * 
The starting time * Specifies the order point of the timestampsquery, asif descending.specified
    */
  public VersionedRangeQuery<K, V> withDescendingTimestampsOptional<Instant> fromTime();

  /**
   * The lowerending time boundpoint of the query, if specified.
   */
  public Optional<K>Optional<Instant> lowerKeyBoundtoTime();

}
  /**
   * The upper bound of the query, if specified
   */
  public Optional<K> upperKeyBound();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> fromTimestamp();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> asOfTimestamp();

  /**
   * @return true if the query orders the returned records by key 
   */
  public boolean isOrderedByKey();

  /**
   * @return true if the query returns records in ascending order of keys
   */
  public boolean isKeyAscending();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isRangeAscending();

}

Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.



Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

Imagine we have the following records 

put(1, 1, time=2023-01-01T10:00:00.00Z)

put(1, null, time=2023-01-05T10:00:00.00Z)

put(2, 20, time=2023-01-10T10:00:00.00Z)

put(3, 30, time=2023-01-12T10:00:00.00Z)

put(1, 2, time=2023-01-15T10:00:00.00Z)

put(1, 3, time=2023-01-20T10:00:00.00Z)

put(2, 30, time=2023-01-25T10:00:00.00Z)

Code Block
languagejava
linenumberstrue
// example 1: MultiVersionedRangeQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedRangeQuery<Integer, Integer> query1 =
        MultiVersionedRangeQuery.withKeyRange(1, 2);
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request1 =
        inStore("my_store").withQuery(query1);
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult1 = kafkaStreams.query(request1);
 
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          Integer key = record.key;
          Integer value = record.value.value();
          Long timestamp = record.value.timestamp();
		  Long validTo = record.value.validTo();
	      System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));  	
        }
     }
}
/* the printed output will be
    key,value: 1,1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z  
    key,value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z 
    key,value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now 
    key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z 
    key,value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now 
*/

// example 2: The value of the records with key range (1,2) from 2023-01-17 Time: 10:00:00.00Z till 2023-01-30 T10:00:00.00Z


MultiVersionedRangeQuery<Integer, Integer> query2 = MultiVersionedRangeQuery.withKeyRange(1, 2);
query2 = query2.fromTime(Instant.parse(2023-01-17T10:00:00.00Z)).toTime(Instant.parse(2023-01-30T10:00:00.00Z));
 
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request2 =
        inStore("my_store").withQuery(query2);
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult2 = kafkaStreams.query(request2);
 
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults2 = versionedRangeResult2.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
Code Block
languagejava
linenumberstrue
final MultiVersionedRangeQuery<Integer, Integer> query =
        MultiVersionedRangeQuery.withKeyRange(1, 2).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z));
 
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request =
while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          Integer key = record.key;
          Integer value  inStore("my_store").withQuery(query= record.value.value();
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request);
 
// Get the resultsLong fromtimestamp all= partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          Integer key = record.key;
          Integer value = record.value.value();
          Long timestamp = record.value.timestamp();
        }
     }
}record.value.timestamp();
 		  Long validTo = record.value.validTo();
	      System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));           
		}
     }
}

/* the printed output will be   
    key, value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now 
    key, value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now
    key, value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z
    key, value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z 
*/


Compatibility, Deprecation, and Migration Plan

...

The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries). Moreover , there will be unit tests where ever needed. 

Rejected Alternatives

The initial plan was to provide ordering based on key and/or timestamp, which is removed from the KIP and may be provided by subsequent KIPs based on user demand.