Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread: here

JIRA: KAFKA-15346

...

  1. single-key latest-value lookup
  2. single-key lookup with asOf timestamp (upper) bound


Proposed Changes

In this KIP we propose a new Optional field in KeyQuery to introduce the class VersionedKeyQuery with an Optional field to store the asOfTimestamp value. Moreover, the The method asOf is added to the class in order to create   creates key queries having asOfTimestamp value as well.  
Defining the latest() method is not needed since returning the latest value has been always the default assumption. In other words, If a query is created without calling the asOf() method, it will return the latest value of the key.

Code Block
languagejava
firstline1
titleKeyQueryVersionedKeyQuery.java
package org.apache.kafka.streams.query;


/**
 * Interactive query for retrieving a single record  from a versioned state store based on its key and timestamp.
 * <p>
 * See KIP-960 for more details.
 */
@Evolving
public final class KeyQuery<KVersionedKeyQuery<K, V> implements Query<V>Query<VersionedRecord<V>> {

    private final K key;
    private final boolean skipCache;
    private final Optional<Instant> asOfTimestamp;

    private KeyQueryVersionedKeyQuery(final K key, final Optional<Instant> asOfTimestamp, final boolean skipCache) {
        this.key = Objects.requireNonNull(key);
        this.asOfTimestamp = asOfTimestamp;
        this.skipCache = skipCache;
    }

    /**
     * Creates a query that will retrieve the latest record from a versioned state store identified by {@code key} if itthe key exists
     * (or {@code null} otherwise).
     * @param key The key to retrieve
     * @throws NullPointerException if @param key is null            
     * @param <K> The type of the key
     * @param <V> The type of the value that will be retrieved
     * @throws */
NullPointerException if @param key publicis staticnull <K, V> KeyQuery<K, V> withKey(final K key) {
    
     return*/
 new KeyQuery<>(key, Optional.empty(), false);
    }
     public static <K, V> VersionedKeyQuery<K, V> withKey(final K key);

    /**
     * Specifies the upperas inclusiveof boundtimestamp for the key query. The key query returns the record
     * with the greatest timestamp <= asOfTimestamp
     * @param asOfTimestamp The upperas inclusiveof boundtimestamp for timestamp
     */
 if @param asOfTimestamp publicis KeyQuery<Knull, V>it asOf(finalwill Instantbe asOfTimestamp) {
        return new KeyQuery<>(key, Optional.of(asOfTimestamp), skipCache);
    }

    /**
     * Specifies that the cache should be skipped during query evaluation. This means, that the queryconsidered as Optional.empty()
     * will always get forwarded to the underlying store.
     */
    public KeyQuery<KVersionedKeyQuery<K, V> skipCacheasOf()final {
        return new KeyQuery<>(key, asOfTimestamp, trueInstant asOfTimestamp);
    }

    /**
     * The key that was specified for this query.
     */
    public K getKeykey() {
        return key;
    }

    /**
     * The timestamp of the query, if specified
     */
    public Optional<Instant> getAsOfTimestampasOfTimestamp() {
        return asOfTimestamp;
    }

    /**
     * The flag whether to skip the cache or not during query evaluation.
     */
    public boolean isSkipCache() {
        return skipCache;
    }
}


Examples

...

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

Code Block
languagejava
linenumberstrue
builder.table(
            "my_topic",
            Consumed.with(Serdes.Integer(), Serdes.Integer()),
            Materialized.as(Stores.persistentVersionedKeyValueStore(
                "my_store",
                Duration.ofMillis(HISTORY_RETENTION)
            ))
        );

final VersionedKeyQuery<Integer, Integer> query = KeyQueryVersionedKeyQuery.withKey(1).asOf(Instant.parse("2023-08-03T10:37:30.00Z");

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>>StateQueryRequest<VersionedRecord<Integer>> request =
        inStore("my_store").withQuery(query);

final StateQueryResult<ValueAndTimestamp<Integer>>StateQueryResult<VersionedRecord<Integer>> result = kafkaStreams.query(request);

...

  • Since this is a completely new set of APIs, no backward compatibility concerns are anticipated. 
  • Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.

Test Plan

The single-key_single-timestamp interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned key queries).