You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 20 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-15346

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The main goal is supporting interactive queries in presence of versioned state stores (KIP-889) in AK. For this KIP, the following query types are considered to be implemented. This KIP discusses single-key, single-timestamp queries. Other types of IQs are explained in the following KIPs (KIP-968 and KIP-969)  

Key Queries with single timestamp:

  1. single-key latest-value lookup
  2. single-key lookup with timestamp (upper) bound


Proposed Changes

In this KIP we propose a new Optional field in KeyQuery to store the asOfTimestamp value. Moreover, the method asOf is added to the class in order to create key queries having asOfTimestamp value as well.  
Defining the latest() method is not needed since returning the latest value has been always the default assumption.

KeyQuery.java
package org.apache.kafka.streams.query;


/**
 * Interactive query for retrieving a single record based on its key.
 */
@Evolving
public final class KeyQuery<K, V> implements Query<V> {

    private final K key;
    private final boolean skipCache;
    private final Optional<Instant> asOfTimestamp;

    private KeyQuery(final K key, final Optional<Instant> asOfTimestamp, final boolean skipCache) {
        this.key = Objects.requireNonNull(key);
        this.asOfTimestamp = asOfTimestamp;
        this.skipCache = skipCache;
    }

    /**
     * Creates a query that will retrieve the record identified by {@code key} if it exists
     * (or {@code null} otherwise).
     * @param key The key to retrieve
     * @param <K> The type of the key
     * @param <V> The type of the value that will be retrieved
     */
    public static <K, V> KeyQuery<K, V> withKey(final K key);
    
    /**
     * Specifies the upper inclusive bound for the key query. The key query returns the record
     * with the greatest timestamp <= asOfTimestamp
     * @param asOfTimestamp The upper inclusive bound for timestamp
     */
    public KeyQuery<K, V> asOf(final Instant asOfTimestamp);
    
     /**
     * Specifies that the cache should be skipped during query evaluation. This means, that the query
     * will always get forwarded to the underlying store.
     */
    public KeyQuery<K, V> skipCache();

    /**
     * The key that was specified for this query.
     */
    public K getKey();

    /**
     * The timestamp of the query, if specified
     */
    public Optional<Instant> getAsOfTimestamp();

    /**
     * The flag whether to skip the cache or not during query evaluation.
     */
    public boolean isSkipCache();
}


Examples....

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

builder.table(
            "my_topic",
            Consumed.with(Serdes.Integer(), Serdes.Integer()),
            Materialized.as(Stores.persistentVersionedKeyValueStore(
                "my_store",
                Duration.ofMillis(HISTORY_RETENTION)
            ))
        );

final VersionedKeyQuery<Integer, Integer> query = KeyQuery.withKey(1).asOf(Instant.parse("2023-08-03T10:37:30.00Z");

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request =
        inStore("my_store").withQuery(query);

final StateQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(request);


Compatibility, Deprecation, and Migration Plan

  • Since this is a completely new set of APIs, no backward compatibility concerns are anticipated. 
  • Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.




  • No labels