...
The main goal is supporting interactive queries in presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and KIP-969
For this KIP, the following query types are considered to be implemented.
...
Public Interfaces
In this KIP we propose two new the public classes: VersionedKeyQuery and , VersionedRangeQuery that will be described in the next section. Moreover, the public interface ValueIterator is used to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp).
Proposed Changes
...
Proposed Changes
For supporting range queries, VersionedRangeQuery class is used.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.streams.query;
/**
* Interactive query for retrieving a set of records with keys within a specified key range and time
* range.
*/
@Evolving
public final class VersionedRangeQuery<K, V> implements
Query<KeyValueIterator<K, VersionedRecord<V>>> {
private final Optional<K> lower;
private final Optional<K> upper;
private final Optional<Instant> fromTimestamp;
private final Optional<Instant> asOfTimestamp;
private final boolean isKeyAscending;
private final boolean isTimeAscending;
private final boolean isOrderedByKey;
private VersionedRangeQuery(
final Optional<K> lower,
final Optional<K> upper,
final Optional<Instant> fromTimestamp,
final Optional<Instant> asOfTimestamp,
final boolean isOrderedByKey,
final boolean isKeyAscending,
final boolean isTimeAscending) {
this.lower = lower;
this.upper = upper;
this.fromTimestamp = fromTimestamp;
this.asOfTimestamp = asOfTimestamp;
this.isOrderedByKey = isOrderedByKey;
this.isKeyAscending = isKeyAscending;
this.isTimeAscending = isTimeAscending;
}
/**
* Interactive range query using a lower and upper bound to filter the keys returned. * For each
* key the records valid within the specified time range are returned. * In case the time range is
* not specified just the latest record for each key is returned.
* @param lower The key that specifies the lower bound of the range
* @param upper The key that specifies the upper bound of the range
* @param <K> The key type
* @param <V> The value type
*/
public static <K, V> VersionedRangeQuery<K, V> withRange(final K lower, final K upper);
/**
* Interactive range query using a lower bound to filter the keys returned. * For each key the
* records valid within the specified time range are returned. * In case the time range is not
* specified just the latest record for each key is returned.
* @param lower The key that specifies the lower bound of the range
* @param <K> The key type
* @param <V> The value type
*/
public static <K, V> VersionedRangeQuery<K, V> withLowerBound(final K lower);
/**
* Interactive range query using a lower bound to filter the keys returned. * For each key the
* records valid within the specified time range are returned. * In case the time range is not
* specified just the latest record for each key is returned.
* @param upper The key that specifies the lower bound of the range
* @param <K> The key type
* @param <V> The value type
*/
public static <K, V> VersionedRangeQuery<K, V> withUpperBound(final K upper);
/**
* Interactive scan query that returns all records in the store. * For each key the records valid
* within the specified time range are returned. * In case the time range is not specified just
* the latest record for each key is returned.
* @param <K> The key type
* @param <V> The value type
*/
public static <K, V> VersionedRangeQuery<K, V> withNoBounds();
/**
* Specifies the starting time point for the key query. The range query returns all the records
* that are valid in the time range starting from the timestamp {@code fromTimestamp}.
* @param fromTimestamp The starting time point
*/
public VersionedRangeQuery<K, V> from(Instant fromTimestamp);
/**
* Specifies the ending time point for the key query. The range query returns all the records that
* have timestamp <= {@code asOfTimestamp}.
* @param asOfTimestamp The ending time point
*/
public VersionedRangeQuery<K, V> asOf(Instant asOfTimestamp);
/**
* Specifies the starting and ending points of the range query as MIN and MAX respectively.
* Therefore, the query returns all the existing records in the state store with keys within the
* specified key range.
* @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already
* specified.
*/
public VersionedRangeQuery<K, V> allVersions();
/**
* Specifies the overall order of returned records by timestamp
*/
public VersionedRangeQuery<K, V> orderByTimestamp();
/**
* Specifies the order of keys as descending.
*/
public VersionedRangeQuery<K, V> withDescendingKeys();
/**
* Specifies the order of the timestamps as descending.
*/
public VersionedRangeQuery<K, V> withDescendingTimestamps();
/**
* The lower bound of the query, if specified.
*/
public Optional<K> getLowerKeyBound();
/**
* The upper bound of the query, if specified
*/
public Optional<K> getUpperKeyBound();
/**
* The starting time point of the query, if specified
*/
public Optional<Instant> getFromTimestamp();
/**
* The ending time point of the query, if specified
*/
public Optional<Instant> getAsOfTimestamp();
/**
* @return true if the query orders the returned records by key
*/
public boolean isOrderedByKey();
/**
* @return true if the query returns records in ascending order of keys
*/
public boolean isKeyAscending();
/**
* @return true if the query returns records in ascending order of timestamps
*/
public boolean isRangeAscending();
}
|
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Code Block | ||||
---|---|---|---|---|
| ||||
final VersionedRangeQuery<Integer, Integer> query = VersionedRangeQuery.keyRangeWithTimestampRangewithKeyRange(1, 2, 1690201149, 1690373949).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request = inStore("my_store").withQuery(query); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); } } } |
...