THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a set of records with keys within a specified key range and time * range. */ @Evolving public final class VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> implements Query<KeyValueIterator<K, VersionedRecord<V>>> { private final Optional<K> lower; private final Optional<K> upper; private final Optional<Instant> fromTimestamp; private final Optional<Instant> asOfTimestamp; private final boolean isKeyAscending; private final boolean isTimeAscending; private final boolean isOrderedByKey; private VersionedRangeQueryMultiVersionedRangeQuery( final Optional<K> lower, final Optional<K> upper, final Optional<Instant> fromTimestamp, final Optional<Instant> asOfTimestamp, final boolean isOrderedByKey, final boolean isKeyAscending, final boolean isTimeAscending) { this.lower = lower; this.upper = upper; this.fromTimestamp = fromTimestamp; this.asOfTimestamp = asOfTimestamp; this.isOrderedByKey = isOrderedByKey; this.isKeyAscending = isKeyAscending; this.isTimeAscending = isTimeAscending; } /** * Interactive range query using a lower and upper bound to filter the keys returned. * For each * key the records valid within the specified time range are returned. * In case the time range is * not specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> withRange(final K lower, final K upper); /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * records valid within the specified time range are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> withLowerBound(final K lower); /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * records valid within the specified time range are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param upper The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> withUpperBound(final K upper); /** * Interactive scan query that returns all records in the store. * For each key the records valid * within the specified time range are returned. * In case the time range is not specified just * the latest record for each key is returned. * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> withNoBounds(); /** * Specifies the starting time point for the key query. The range query returns all the records * that are valid in the time range starting from the timestamp {@code fromTimestamp}. * @param fromTimestamp The starting time point */ public VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> from(Instant fromTimestamp); /** * Specifies the ending time point for the key query. The range query returns all the records that * have timestamp <= {@code asOfTimestamp}. * @param asOfTimestamp The ending time point */ public VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> asOf(Instant asOfTimestamp); /** * Specifies the starting and ending points of the range query as MIN and MAX respectively. * Therefore, the query returns all the existing records in the state store with keys within the * specified key range. * @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already * specified. */ public VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> allVersions(); /** * Specifies the overall order of returned records by timestamp */ public VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> orderByTimestamp(); /** * Specifies the order of keys as descending. */ public VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> withDescendingKeys(); /** * Specifies the order of the timestamps as descending. */ public VersionedRangeQuery<K, V> withDescendingTimestamps(); /** * The lower bound of the query, if specified. */ public Optional<K> getLowerKeyBound(); /** * The upper bound of the query, if specified */ public Optional<K> getUpperKeyBound(); /** * The starting time point of the query, if specified */ public Optional<Instant> getFromTimestamp(); /** * The ending time point of the query, if specified */ public Optional<Instant> getAsOfTimestamp(); /** * @return true if the query orders the returned records by key */ public boolean isOrderedByKey(); /** * @return true if the query returns records in ascending order of keys */ public boolean isKeyAscending(); /** * @return true if the query returns records in ascending order of timestamps */ public boolean isRangeAscending(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
final VersionedRangeQuery<IntegerMultiVersionedRangeQuery<Integer, Integer> query = VersionedRangeQueryMultiVersionedRangeQuery.withKeyRange(1, 2).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request = inStore("my_store").withQuery(query); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); } } } |
...