Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
firstline1
titleKeyQueryVersionedKeyQuery.java
package org.apache.kafka.streams.query;


/**
 * Interactive query for retrieving a single record  from a versioned state store based on its key and timestamp.
 * <p>
 * See KIP-960 for more details.
 */
@Evolving
public final class KeyQuery<KVersionedKeyQuery<K, V> implements Query<V>Query<VersionedRecord<V>> {

    private final K key;
    private final boolean skipCache;
    private final Optional<Instant> asOfTimestamp;

    private KeyQueryVersionedKeyQuery(final K key, final Optional<Instant> asOfTimestamp, final boolean skipCache) {
        this.key = Objects.requireNonNull(key);
        this.asOfTimestamp = asOfTimestamp;
        this.skipCache = skipCache;
    }

    /**
     * Creates a query that will retrieve the record from a versioned state store identified by {@code key} if it exists
     * (or {@code null} otherwise).
     * @param key The key to retrieve
     * @param <K> The type of the key
     * @param <V> The type of the value that will be retrieved
     */
    public static <K, V> KeyQuery<KVersionedKeyQuery<K, V> withKey(final K key);
    
    /**
     * Specifies the upper inclusive bound for the key query. The key query returns the record
     * with the greatest timestamp <= asOfTimestamp
     * @param asOfTimestamp The upper inclusive bound for timestamp
     */
    public KeyQuery<KVersionedKeyQuery<K, V> asOf(final Instant asOfTimestamp);
    
     /**
     * Specifies that the cache should be skipped during query evaluation. This means, that the query
     * will always get forwarded to the underlying store.
     */
    public KeyQuery<K, V> skipCache();

    /**
     * The key that was specified for this query.
     */
    public K getKey();

    /**
     * The timestamp of the query, if specified
     */
    public Optional<Instant> getAsOfTimestamp();

    /**
     * The flag whether to skip the cache or not during query evaluation.
     */
    public boolean isSkipCache();
}

}  


Examples....

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

Code Block
languagejava
linenumberstrue
builder.table(
            "my_topic",
            Consumed.with(Serdes.Integer(), Serdes.Integer()),
            Materialized.as(Stores.persistentVersionedKeyValueStore(
                "my_store",
                Duration.ofMillis(HISTORY_RETENTION)
            ))
        );

final KeyQuery<IntegerVersionedKeyQuery<Integer, ValueAndTimestamp<Integer>>VersionedRecord<Integer>> query = KeyQueryVersionedKeyQuery.withKey(1).asOf(Instant.parse("2023-08-03T10:37:30.00Z");

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>>StateQueryRequest<VersionedRecord<Integer>> request =
        inStore("my_store").withQuery(query);

final StateQueryResult<ValueAndTimestamp<Integer>>StateQueryResult<VersionedRecord<Integer>> result = kafkaStreams.query(request);

...