THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
@Evolving public class RangeQuery<K, V> implements Query<V> { private final Optional<K> lower; private final Optional<K> upper; private RangeQuery(final Optional<K> lower, final Optional<K> upper) { this.lower = lower; this.upper = upper; } public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) { return new RangeQuery<>(Optional.of(lower), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) { return new RangeQuery<>(Optional.empty(), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) { return new RangeQuery<>(Optional.of(lower), Optional.empty()); } public static <K, V> RangeQuery<K, V> withNoBounds() { return new RangeQuery<>(Optional.empty(), Optional.empty()); } public Optional<K> getLowerBound() { return lower; } public Optional<K> getUpperBound() { return upper; } } // ====================================== // Range query example usage in IQv2: Integer key1 = 1; Integer key2 = 2; // create the query parameters final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = kafkaStreams.serdesForStore("mystore") StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query = inStore("mystore") .withQuery(RangeQuery.withRange(Bytes.wrap(serdes.rawKey(key1)), Bytes.wrap(serdes.rawKey(key2)))); // run the query StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults = rangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Bytes, byte[]> keyValueIterator = entry.getValue().getResult()) { while (keyValueIterator.hasNext()) { final KeyValue<Bytes, byte[]> next = keyValueIterator.next(); Integer key = serdes.keyFrom(next.key.get()); Integer value = serdes.valueFrom(next.value)); } } } // ====================================== // Scan query example usage in IQv2: // create the query parameters StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query = inStore("mystore") .withQuery(RangeQuery.with NoBoundswithNoBounds()); // run the query StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query); |
...