Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread: here

JIRA:KAFKA-1525715346 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The main goal is supporting interactive queries in presence of versioned state stores (KIP-889) in AK. For this KIP, the following query types are considered to be implemented.   

Key Queries:

This KIP discusses single-key

...

, single-

...

Range Queries

  1. key-range latest-value query
  2. key-range with lower bound latest-value query
  3. key-range with upper bound latest-value query
  4. all-keys (no bound) latest-value query
  5. key-range query with timestamp (upper) bound
  6. key-range with lower bound with timestamp (upper) bound 
  7. key-range with upper bound with timestamp (upper) bound
  8. all-keys (no bound) with timestamp (upper) bound
  9. key-range query with timestamp range
  10. key-range query with lower bound with timestamp range
  11. key-range query with upper bound with timestamp range
  12. all-keys (no bound) with timestamp range
  13. key-range query all-versions
  14. key-range query with lower bound all-versions
  15. key-range query with upper bond all-versions
  16. all-keys query (no bound) all-versions (entire store)

Public Interfaces

In this KIP we propose two new public classes: VersionedKeyQuery and VersionedRangeQuery that will be described in the next section. Moreover, the public interface ValueIterator is used to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp). 

Proposed Changes

...

languagejava
firstline1
titleValueIterator.java
linenumberstrue
collapsetrue

...

timestamp queries. Other types of IQs are explained in the following KIPs (KIP-968 and KIP-969)  

Key Queries with single timestamp:

  1. single-key latest-value lookup
  2. single-key lookup with asOf timestamp


Proposed Changes

In this KIP we introduce the class VersionedKeyQuery with an Optional field to store the asOfTimestamp value. The method asOf  creates key queries having asOfTimestamp value as well.  
Defining the latest() method is not needed since returning the latest value has been always the default assumption. In other words, If a query is created without calling the asOf() method, it will return the latest value of the key.

Code Block
languagejava
firstline1
titleVersionedKeyQuery.java
linenumberstrue
collapsetrue
package org.apache.kafka.streams.query;

@Evolving
public final class VersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> {

  private final K key;
  private final Optional<Long> asOfTimestamp;
  private final Optional<Long> untilTimestamp;

  private VersionedKeyQuery(final K key, Optional<Long> asOfTimestamp, Optional<Long> untilTimestamp);

  /**
   * Interactive key query that returns one record with the specified key
   * * and with the latest timestamp
   * @param key The key to retrieve
   * @param <K> The key type
   * @param <V> The value type
   */
  public static <K, V> VersionedKeyQuery<K, V> withKeyLatestValue(final K key);

/**
   * Interactive key query that returns one record with the specified key
   * * and with the greatest timestamp <= untilTimeStamp
   * @param key The key to retrieve
   * @param untilTimeStamp The upperbound for timestamp
   * @param <K> The key type
   * @param <V> The value type
   */
  public static <K, V> VersionedKeyQuery<K, V> withKeyWithTimestampBound(final K key, long untilTimeStamp);

  /**
   * Interactive key query that returns the records with the specified key
   * * within the specified time range.
   * @param asOfTimestamp The lowerbound for timestamp
   * @param untilTimeStamp The upperbound for timestamp
   * @param key The key to retrieve
   * @param <K> The key type
   * @param <V> The value type
   */
  public static <K, V> VersionedKeyQuery<K, V> withKeyWithTimestampRange(final K key, long asOfTimestamp, long untilTimestamp);

  /**
   * Interactive key query that returns all the records in the store with the specified key.
   * @param key The key to retrieve
   * @param <K> The key type
   * @param <V> The value type
   */
  public static <K, V> VersionedKeyQuery<K, V> withKeyAllVersions(final K key);

  /**
   * The key that was specified for this query.
   */
  public K getKey();


  /**
   * The lower bound for timestamp of the query, if specified
   */
  public Optional<Long> getAsOfTimestamp();

  /**
   * The upper bound for timestamp of the query, if specified
   */
  public Optional<Long> getUntilTimestamp();
}

For supporting range queries, VersionedRangeQuery class is used.

Code Block
languagejava
firstline1
titleVersionedRangeQuery.java
linenumberstrue
collapsetrue
package org.apache.kafka.streams.query;for retrieving a single record  from a versioned state store based on its key and timestamp.
 * <p>
 * See KIP-960 for more details.
 */
@Evolving
public final class VersionedRangeQuery<KVersionedKeyQuery<K, V> implements Query<KeyValueIterator<K,Query<VersionedRecord<V>> VersionedRecord<V>>> {
    private final Optional<K> lower;
    private final Optional<K>K upperkey;
    private final Optional<Long>Optional<Instant> asOfTimestamp;
    private final Optional<Long> untilTimestamp;

    private VersionedRangeQuery(
        VersionedKeyQuery(final Optional<K>K lowerkey,
        final Optional<K> upper,
        final Optional<Long> asOfTimestamp,
        final Optional<Long> untilTimestamp);

    /**Optional<Instant> asOfTimestamp) {
     * Interactive range query using a lower and upper bound to filter the keys returned.
     * * For each key only the record with the latest timestamp is returned.
     * @param lower The key that specifies the lower bound of the range
     * @param upper The key that specifies the upper bound of the range
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> keyRangeLatestValue(final K lower, final K upper);

    /**
     * Interactive range query using a lower bound to filter the keys returned.
     * * For each key only the record with the latest timestamp is returned.
     * @param lower The key that specifies the lower bound of the range
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> withLowerBoundLatestValue(final K lower);

    / this.key = Objects.requireNonNull(key);
        this.asOfTimestamp = asOfTimestamp;
    }

    /**
     * Interactive range query using a lower bound to filter the keys returned.
     * * For each key only the record with the latest timestamp is returned.
     * @param upper The key that specifies the upper bound of the range
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> withUpperBoundLatestValue(final K upper);

    /**
     * Interactive scan Creates a query that returns all records inwill retrieve the store.
     * * For each key only the record with the latest timestamp is returned.
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> withNoBoundLatestValue();

    /**
     * Interactive range query using a lower and upper bound to filter the keys returned.
     * * For each key all the records with the greatest timestamp <= untilTimeStamp are returned.
     * @param lower The key that specifies the lower bound of the range
     * @param upper The key that specifies the upper bound of the range
     * @param untilTimeStamp The upperbound for timestamp
     * @param <K> The key type
     * @param <V> The value typelatest record from a versioned state store identified by {@code key} if the key exists
     */
    public static <K, V> VersionedRangeQuery<K, V> KeyRangeWithTimestampBound(final K lower, final K upper, final long untilTimeStamp);

    /**
     * Interactive range query using a lower bound to filter the keys returned.
 (or {@code null} otherwise).
    * * For each key all the records with the greatest timestamp <= untilTimeStamp are returned.
     * @param lower The key that specifies the lower bound of the range
     * @param untilTimeStamp The upperbound for timestamp
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> withLowerBoundWithTimestampBound(final K lower, final long untilTimeStamp);

    /**
     * Interactive range query using an upper and upper bound to filter the keys returned.to retrieve
     * *@throws For each key all the records with the greatest timestamp <= untilTimeStamp are returned.
     * @param upper TheNullPointerException if @param key thatis specifies thenull upper bound of the range
     * @param untilTimeStamp The upperbound for timestamp
       * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> withUpperBoundWithTimestampBound(final K upper, final long untilTimeStamp);

    /**
     * Interactive scan query that returns all records in the store.
     * * For each key all the records with the greatest timestamp <= untilTimeStamp are returned.
     * @param untilTimeStamp The upperbound for timestamp
     * @param <K> The key type of the key
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> withNoBoundWithTimestampBound(final long untilTimeStamp);

    /**
     * Interactive range query using a lower and upper bound to filter the keys returned.
     * * For each key all the records within the specified time range are returned.
     * @param lower The key that specifies the lower bound of the range
     * @param upper The key value that specifieswill the upper bound of the rangebe retrieved
      * @param@throws asOfTimestampNullPointerException The lowerbound for timestamp
     * if @param untilTimeStampkey The upperbound for timestamp
     * @param <K> The key type
is null       * @param <V> The value type
      */
    public static <K, V> VersionedRangeQuery<KVersionedKeyQuery<K, V> keyRangeWithTimestampRangewithKey(final K lower, final K upper, final long asOfTimestamp, final long untilTimestampkey);

    /**
     * Interactive range query using a lower bound to filter the keys returned.
     * * For each key all the records within the specified time range are returned.
     * @param lower The key that specifies the lower bound of the range
     * @param asOfTimestamp The lowerbound for timestamp
     * @param untilTimeStamp The upperbound for timestamp       
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> withLowerBoundWithTimestampRange(final K lower, final long asOfTimestamp, final long untilTimestamp);

    /**
     * Interactive range query using an upper bound to filter the keys returned.
     * * For each key all the records within the specified time range are returned.
     * @param asOfTimestamp The lowerbound for timestamp
     * @param untilTimeStamp The upperbound for timestamp 
     * @param upper The key that specifies the upper bound of the range
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> withUpperBoundWithTimestampRange(final K upper, final long asOfTimestamp, final long untilTimestamp);

    /**
     * Interactive scan query that returns all records in the store.Specifies the as of timestamp for the key query. The key query returns the record
     * * For each key all with the recordsgreatest withintimestamp the specified time range are returned.
<= asOfTimestamp
      * @param asOfTimestamp The lowerboundas forof timestamp
     * @param untilTimeStamp The upperbound for timestamp 
     * @param <K> The key type
     ** if @param <V>asOfTimestamp Theis valuenull, type
it will be considered  */
    public static <K, V> VersionedRangeQuery<K, V> withNoBoundWithTimestampRange(final long asOfTimestamp, final long untilTimestamp);

    /**
     * Interactive range query using a lower and upper bound to filter the keys returned.as Optional.empty()
     * For each key all values from the oldest till the newest record existing in the state store
     * * are returned
     * @param lower The key that specifies the lower bound of the range
     * @param upper The key that specifies the upper bound of the range
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <KVersionedKeyQuery<K, V> VersionedRangeQuery<K, V> keyRangeAllVersionsasOf(final K lower, final K upperInstant asOfTimestamp);

    /**
     * Interactive range query using a lower bound to filter the keys returned.
     * For each key all values from the oldest till the newest record existing in the state store
     * * are returned
     * @param lower The key that specifieswas thespecified lower bound of the range
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> withLowerBoundAllVersions(final K lower);

    /**
     * Interactive range query using an upper bound to filter the keys returned.
     * For each key all values from the oldest till the newest record existing in the state storefor this query.
     * * are returned
     * @param upper The key that specifies the lower bound of the range
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> withUpperBoundAllVersions(final K upper);

    /**
     * Interactive scan query that returns all records in the store.
     * For each key all values from the oldest till the newest record existing in the state store
     * * are returned
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> VersionedRangeQuery<K, V> withNoBoundAllVersions();

    /**
     * The lower bound of the query, if specified.
     */
    public Optional<K> getLowerBound();

    /**
     * The upper bound of the query, if specified
     */
    public Optional<K> getUpperBound();

    /**
     * The upper bound for timestamp of the query, if specified
     */
    public Optional<Long>Optional<Instant> getAsOfTimestampasOfTimestamp();
    /**
     * The upper bound for timestamp of the query, if specified
     */
    public Optional<Long> getUntilTimestamp();
}}  


Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

Code Block
languagejava
linenumberstrue
collapsetrue
final VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKeyAllVersions(1);

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request =
        inStore("my_store").withQuery(query);

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult = kafkaStreams.query(request);

// Get the results from all partitions.
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults = versionedKeyResult.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
builder.table(
            "my_topic",
            Consumed.with(Serdes.Integer(), Serdes.Integer()),
            Materialized.as(Stores.persistentVersionedKeyValueStore(
                "my_store",
            final VersionedRecord<Integer> record = iteratorDuration.nextofMillis(HISTORY_RETENTION);
          Long timestamp = record.timestamp();
		  Integer value = record.value();	
        }
     }
}  

The following example illustrates the use of the VersionedRangeQuery class to query a versioned state store.

Code Block
languagejava
firstline1
linenumberstrue
collapsetrue
final VersionedRangeQuery<Integer);

final VersionedKeyQuery<Integer, Integer> query =
        VersionedRangeQuery.keyRangeWithTimestampRange(1, 2, 1690201149, 1690373949 VersionedKeyQuery.withKey(1).asOf(Instant.parse("2023-08-03T10:37:30.00Z");

final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>>StateQueryRequest<VersionedRecord<Integer>> request =
        inStore("my_store").withQuery(query);

final StateQueryResult<KeyValueIterator<Integer,StateQueryResult<VersionedRecord<Integer>> VersionedRecord<Integer>>>result versionedRangeResult = kafkaStreams.query(request);

// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
	try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          Integer key = record.key;
          Integer value = record.value.value();
          Long timestamp = record.value.timestamp();
        }
     }
}


Compatibility, Deprecation, and Migration Plan

  • Since this is a completely new set of APIs, no backward compatibility concerns are anticipated. 
  • Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.

Test Plan

The single-key_single-timestamp interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned key queries).