Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
Integer key1 = 1;
Integer key2 = 2;
long lowerBound = beginningOfDay();
long upperBound = now();
 
StateQueryRequest<KeyValueIterator<Windowed<Bytes>>StateQueryRequest<KeyValueIterator<Windowed<Bytes>, byte[]>>, byte[]> query =
  inStore("rocksdbsessionstore")
  .withQuery(WindowQuery.withRange(key1, key2, lowerBound, upperBound));
       
// run the query
StateQueryResult<KeyValueIterator<IntegerStateQueryResult<KeyValueIterator<Windowed<Bytes>>, Integer>>byte[]> result = kafkaStreams.query(query);
  
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Windowed<Bytes>QueryResult<KeyValueIterator<Windowed<Bytes>>, Integer>>byte[]> partitionResults = rangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Windowed<Bytes>QueryResult<KeyValueIterator<Windowed<Bytes>>, byte[]>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<IntegerKeyValueIterator<Windowed<Bytes>>, Integer>byte[] keyValueIterator = entry.getValue().getResult()) {
        while (keyValueIterator.hasNext()) {
            final KeyValue<IntegerKeyValue<Bytes, Integer>byte[]> next = keyValueIterator.next();
            IntegerBytes key = next.key.get;
            Integerbyte[] value = next.value;
        }
     }
} 

...