THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a set of records with keys within a specified key range and time * range. */ @Evolving public final class MultiVersionedRangeQuery<K, V> implements Query<KeyValueIterator<K, VersionedRecord<V>>> { private final Optional<K> lower; private final Optional<K> upper; private final Optional<Instant> fromTime; private final Optional<Instant> toTime; private final boolean isKeyAscending; private final boolean isTimeAscending; private final boolean isOrderedByKey; private MultiVersionedRangeQuery( final Optional<K> lower, final Optional<K> upper, final Optional<Instant> fromTime, final Optional<Instant> toTime, final boolean isOrderedByKey, final boolean isKeyAscending, final boolean isTimeAscending) { this.lower = lower; this.upper = upper; this.fromTime = fromTime; this.toTime = toTime; this.isOrderedByKey = isOrderedByKey; this.isKeyAscending = isKeyAscending; this.isTimeAscending = isTimeAscending; } /** * Interactive range query using a lower and upper bound to filter the keys returned. * For each * key the records valid within the specified time range are returned. * In case the time range is * not specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> withKeyRange(final K lower, final K upper); /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * records valid within the specified time range are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> withLowerKeyBound(final K lower); /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * records valid within the specified time range are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param upper The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> withUpperKeyBound(final K upper); /** * Interactive scan query that returns all records in the store. * For each key the records valid * within the specified time range are returned. * In case the time range is not specified just * the latest record for each key is returned. * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> allKeys(); /** * Specifies the starting time point for the key query. The range query returns all the records * that are valid in the time range starting from the timestamp {@code fromTime}. * @param fromTime The starting time point */ public MultiVersionedRangeQuery<K, V> fromfromTime(Instant fromTime); /** * Specifies the ending time point for the key query. The range query returns all the records that * have timestamp <= {@code toTime}. * @param toTime The ending time point */ public MultiVersionedRangeQuery<K, V> asOftoTime(Instant toTime); /** * Specifies the overall order of returned records by timestamp */ public MultiVersionedRangeQuery<K, V> orderByTimestamp(); /** * Specifies the order of keys as descending. */ public MultiVersionedRangeQuery<K, V> withDescendingKeys(); /** * Specifies the order of the timestamps as descending. */ public MultiVersionedRangeQuery<K, V> withDescendingTimestamps(); /** * The lower bound of the query, if specified. */ public Optional<K> lowerKeyBound(); /** * The upper bound of the query, if specified */ public Optional<K> upperKeyBound(); /** * The starting time point of the query, if specified */ public Optional<Instant> fromTime(); /** * The ending time point of the query, if specified */ public Optional<Instant> toTime(); /** * @return true if the query orders the returned records by key */ public boolean isOrderedByKey(); /** * @return true if the query returns records in ascending order of keys */ public boolean isKeyAscending(); /** * @return true if the query returns records in ascending order of timestamps */ public boolean isTimeAscending(); } |
...
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Imagine we have the following records
put(1, 1, time=2023-01-01T10:00:00.00Z)
put(1, null, time=2023-01-05T10:00:00.00Z)
put(2, 20, time=2023-01-10T10:00:00.00Z)
put(3, 30, time=2023-01-12T10:00:00.00Z)
put(1, 2, time=2023-01-15T10:00:00.00Z)
put(1, 3, time=2023-01-20T10:00:00.00Z)
put(2, 30, time=2023-01-25T10:00:00.00Z)
Code Block | ||||
---|---|---|---|---|
| ||||
// example 1: MultiVersionedRangeQuery without specifying any time bound will be interpreted as all versions final MultiVersionedRangeQuery<Integer, Integer> query1 = MultiVersionedRangeQuery.withKeyRange(1, 2); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request1 = inStore("my_store").withQuery(query1); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult1 = kafkaStreams.query(request1); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); Long validTo = record.value.validTo(); System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be key,value: 1,1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z key,value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z key,value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z key,value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now */ // example 2: The value of the records with key range (1,2) from 2023-01-17 Time: 10:00:00.00Z till 2023-01-30 T10:00:00.00Z final MultiVersionedRangeQuery<Integer, Integer> query2 = MultiVersionedRangeQuery.withKeyRange(1, 2); query2 = query2.fromTime(Instant.parse(2023-01-17T10:00:00.00Z)).toTime(Instant.parse(2023-01-30T10:00:00.00Z)).orderByTimestamp().withDescendingTimestamps(); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request2 = inStore("my_store").withQuery(query2); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult2 = kafkaStreams.query(request2); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults2 = versionedRangeResult2.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); Long validTo = record.value.validTo(); System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be value: 12,230, timestamp: 2023-01-15T1025T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Znow value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now value value: 1,2,30, timestamp: 2023-01-25T1015T10:00:00.00Z, valid till: now 2023-01-20T10:00:00.00Z */ |
Compatibility, Deprecation, and Migration Plan
...