THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
@Evolving public class RangeQuery<K, V> implements Query<V>Query<KeyValueIterator<K, V>> { private final Optional<K> lower; private final Optional<K> upper; private RangeQuery(final Optional<K> lower, final Optional<K> upper) { this.lower = lower; this.upper = upper; } public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) { return new RangeQuery<>(Optional.of(lower), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) { return new RangeQuery<>(Optional.empty(), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) { return new RangeQuery<>(Optional.of(lower), Optional.empty()); } public static <K, V> RangeQuery<K, V> withNoBounds() { return new RangeQuery<>(Optional.empty(), Optional.empty()); } public Optional<K> getLowerBound() { return lower; } public Optional<K> getUpperBound() { return upper; } } // ====================================== // Range query example usage in IQv2: Integer key1 = 1; Integer key2 = 2; // create the query parameters final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = kafkaStreams.serdesForStore("mystore") StateQueryRequest<KeyValueIterator<Integer, Integer>> query = inStore("mystore") .withQuery(RangeQuery.withRange(key1, key2)); // run the query StateQueryResult<KeyValueIterator<Integer, Integer>> result = kafkaStreams.query(query); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> partitionResults = rangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, Integer> keyValueIterator = entry.getValue().getResult()) { while (keyValueIterator.hasNext()) { final KeyValue<Integer, Integer> next = keyValueIterator.next(); Integer key = next.key.get; Integer value = next.value; } } } // ====================================== // Scan query example usage in IQv2: // create the query parameters StateQueryRequest<KeyValueIterator<Integer, Integer>> query = inStore("mystore") .withQuery(RangeQuery.withNoBounds()); // run the query StateQueryResult<KeyValueIterator<Integer, Integer>> result = kafkaStreams.query(query); |
...