THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
@Evolving
public class RangeQuery<K, V> implements Query<V> {
private final Optional<K> lower;
private final Optional<K> upper;
private RangeQuery(final Optional<K> lower, final Optional<K> upper) {
this.lower = lower;
this.upper = upper;
}
public static <K, V> RangeQuery<K, V> withRange(final Optional<K> lower, final Optional<K> upper) {
return new RangeQuery<>(lower, upper);
}
public Optional<K> getLowerBound() {
return lower;
}
public Optional<K> getUpperBound() {
return upper;
}
}
// ======================================
// Range query example usage in IQv2:
Integer key1 = 1;
Integer key2 = 2;
// create the query parameters
final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes =
kafkaStreams.serdesForStore("mystore")
StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query =
inStore("mystore")
.withQuery(RangeQuery.withRange(Optional.of(Bytes.wrap(serdes.rawKey(key1))),
Optional.of(Bytes.wrap(serdes.rawKey(key2)))));
// run the query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query);
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
rangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> entry : partitionResults.entrySet()) {
try (final KeyValueIterator<Bytes, byte[]> keyValueIterator = entry.getValue().getResult()) {
while (keyValueIterator.hasNext()) {
final KeyValue<Bytes, byte[]> next = keyValueIterator.next();
Integer key = serdes.keyFrom(next.key.get());
Integer value = serdes.valueFrom(next.value));
}
}
}
// ======================================
// Scan query example usage in IQv2:
// create the query parameters
StateQueryRequest<KeyValueIterator<Bytes, byte[]>> query =
inStore("mystore")
.withQuery(RangeQuery.withRange(Optional.empty())),
Optional.empty())));
// run the query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> result = kafkaStreams.query(query);
|
Compatibility, Deprecation, and Migration Plan
...