THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a single record from a versioned state store based on its key and timestamp. * <p> * See KIP-960 for more details. */ @Evolving public final class KeyQuery<KVersionedKeyQuery<K, V> implements Query<V>Query<VersionedRecord<V>> { private final K key; private final boolean skipCache; private final Optional<Instant> asOfTimestamp; private KeyQueryVersionedKeyQuery(final K key, final Optional<Instant> asOfTimestamp, final boolean skipCache) { this.key = Objects.requireNonNull(key); this.asOfTimestamp = asOfTimestamp; this.skipCache = skipCache; } /** * Creates a query that will retrieve the record from a versioned state store identified by {@code key} if it exists * (or {@code null} otherwise). * @param key The key to retrieve * @param <K> The type of the key * @param <V> The type of the value that will be retrieved */ public static <K, V> KeyQuery<KVersionedKeyQuery<K, V> withKey(final K key); /** * Specifies the upper inclusive bound for the key query. The key query returns the record * with the greatest timestamp <= asOfTimestamp * @param asOfTimestamp The upper inclusive bound for timestamp */ public KeyQuery<KVersionedKeyQuery<K, V> asOf(final Instant asOfTimestamp); /** * Specifies that the cache should be skipped during query evaluation. This means, that the query * will always get forwarded to the underlying store. */ public KeyQuery<K, V> skipCache(); /** * The key that was specified for this query. */ public K getKey(); /** * The timestamp of the query, if specified */ public Optional<Instant> getAsOfTimestamp(); /** * The flag whether to skip the cache or not during query evaluation. */ public boolean isSkipCache(); } } |
Examples....
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Code Block | ||||
---|---|---|---|---|
| ||||
builder.table( "my_topic", Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as(Stores.persistentVersionedKeyValueStore( "my_store", Duration.ofMillis(HISTORY_RETENTION) )) ); final KeyQuery<IntegerVersionedKeyQuery<Integer, ValueAndTimestamp<Integer>>VersionedRecord<Integer>> query = KeyQueryVersionedKeyQuery.withKey(1).asOf(Instant.parse("2023-08-03T10:37:30.00Z"); final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>>StateQueryRequest<VersionedRecord<Integer>> request = inStore("my_store").withQuery(query); final StateQueryResult<ValueAndTimestamp<Integer>>StateQueryResult<VersionedRecord<Integer>> result = kafkaStreams.query(request); |
...