Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The main goal is supporting interactive queries in presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and KIP-969
For this KIP, the following query types are considered to be implemented.  

...

Public Interfaces

In this KIP we propose two new the public classes: VersionedKeyQuery and , VersionedRangeQuery that will be described in the next section. Moreover, the public interface ValueIterator is used to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp). 

Proposed Changes

...

 

Proposed Changes

For supporting range queries, VersionedRangeQuery class is used.

Code Block
languagejava
titleVersionedRangeQuery
linenumberstrue
package org.apache.kafka.streams.query;


/**
 * Interactive query for retrieving a set of records with keys within a specified key range and time
 * range.
 */

@Evolving
public final class VersionedRangeQuery<K, V> implements
    Query<KeyValueIterator<K, VersionedRecord<V>>> {

  private final Optional<K> lower;
  private final Optional<K> upper;
  private final Optional<Instant> fromTimestamp;

  private final Optional<Instant> asOfTimestamp;
  private final boolean isKeyAscending;
  private final boolean isTimeAscending;
  private final boolean isOrderedByKey;

  private VersionedRangeQuery(
      final Optional<K> lower,
      final Optional<K> upper,
      final Optional<Instant> fromTimestamp,
      final Optional<Instant> asOfTimestamp,
      final boolean isOrderedByKey,
      final boolean isKeyAscending,
      final boolean isTimeAscending) {
    this.lower = lower;
    this.upper = upper;
    this.fromTimestamp = fromTimestamp;
    this.asOfTimestamp = asOfTimestamp;
    this.isOrderedByKey = isOrderedByKey;
    this.isKeyAscending = isKeyAscending;
    this.isTimeAscending = isTimeAscending;
  }

  /**
   * Interactive range query using a lower and upper bound to filter the keys returned. * For each
   * key the records valid within the specified time range are returned. * In case the time range is
   * not specified just the latest record for each key is returned.
   * @param lower The key that specifies the lower bound of the range
   * @param upper The key that specifies the upper bound of the range
   * @param <K>   The key type
   * @param <V>   The value type
   */
  public static <K, V> VersionedRangeQuery<K, V> withRange(final K lower, final K upper);

  /**
   * Interactive range query using a lower bound to filter the keys returned. * For each key the
   * records valid within the specified time range are returned. * In case the time range is not
   * specified just the latest record for each key is returned.
   * @param lower The key that specifies the lower bound of the range
   * @param <K>   The key type
   * @param <V>   The value type
   */
  public static <K, V> VersionedRangeQuery<K, V> withLowerBound(final K lower);

  /**
   * Interactive range query using a lower bound to filter the keys returned. * For each key the
   * records valid within the specified time range are returned. * In case the time range is not
   * specified just the latest record for each key is returned.
   * @param upper The key that specifies the lower bound of the range
   * @param <K>   The key type
   * @param <V>   The value type
   */
  public static <K, V> VersionedRangeQuery<K, V> withUpperBound(final K upper);

  /**
   * Interactive scan query that returns all records in the store. * For each key the records valid
   * within the specified time range are returned. * In case the time range is not specified just
   * the latest record for each key is returned.
   * @param <K> The key type
   * @param <V> The value type
   */
  public static <K, V> VersionedRangeQuery<K, V> withNoBounds();

  /**
   * Specifies the starting time point for the key query. The range query returns all the records
   * that are valid in the time range starting from the timestamp {@code fromTimestamp}.
   * @param fromTimestamp The starting time point
   */
  public VersionedRangeQuery<K, V> from(Instant fromTimestamp);

  /**
   * Specifies the ending time point for the key query. The range query returns all the records that
   * have timestamp <= {@code asOfTimestamp}.
   * @param asOfTimestamp The ending time point
   */
  public VersionedRangeQuery<K, V> asOf(Instant asOfTimestamp);

  /**
   * Specifies the starting and ending points of the range query as MIN and MAX respectively.
   * Therefore, the query returns all the existing records in the state store with keys within the
   * specified key range.
   * @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already
   *                          specified.
   */
   public VersionedRangeQuery<K, V> allVersions();

  /**
   * Specifies the overall order of returned records by timestamp
   */
  public VersionedRangeQuery<K, V> orderByTimestamp();

  /**
   * Specifies the order of keys as descending.
   */
  public VersionedRangeQuery<K, V> withDescendingKeys();

  /**    
   * Specifies the order of the timestamps as descending.
   */
  public VersionedRangeQuery<K, V> withDescendingTimestamps();

  /**
   * The lower bound of the query, if specified.
   */
  public Optional<K> getLowerKeyBound();

  /**
   * The upper bound of the query, if specified
   */
  public Optional<K> getUpperKeyBound();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> getFromTimestamp();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> getAsOfTimestamp();

  /**
   * @return true if the query orders the returned records by key 
   */
  public boolean isOrderedByKey();

  /**
   * @return true if the query returns records in ascending order of keys
   */
  public boolean isKeyAscending();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isRangeAscending();

}


Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

Code Block
languagejava
linenumberstrue
final VersionedRangeQuery<Integer, Integer> query =
        VersionedRangeQuery.keyRangeWithTimestampRangewithKeyRange(1, 2, 1690201149, 1690373949).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z));
 
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request =
        inStore("my_store").withQuery(query);
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request);
 
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          Integer key = record.key;
          Integer value = record.value.value();
          Long timestamp = record.value.timestamp();
        }
     }
}

...