THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
private RawScanQuery() { }
public static RawScanQuery scan() {
return new RawScanQuery();
}
}
// example usage
// since we're handling raw data, we're going to need the serdes
// this is just an example, this method is also not proposed in this KIP.
StateQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
kafkaStreams.serdesForStore("mystore");
// run the "scan" query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));
// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
scanResult.getPartitionResults();
// for this example, we'll just collate all the partitions' iterators
// together and print their data
List<KeyValueIterator<Bytes, byte[]>> iterators =
partitionResults
.values()
.stream()
.map(QueryResult::getResult)
.collect(Collectors.toList());
// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
while(collate.hasNext()) {
KeyValue<Bytes, byte[]> next = collated.next();
System.out.println(
"|||" +
" " + serdes.keyFrom(next.key.get()) +
" " + serdes.valueFrom(next.value)
);
}
} |
...