THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
Integer key1 = 1; Integer key2 = 2; long lowerBound = beginningOfDay(); long upperBound = now(); StateQueryRequest<KeyValueIterator<Windowed<Bytes>>StateQueryRequest<KeyValueIterator<Windowed<Bytes>, byte[]>>, byte[]> query = inStore("rocksdbsessionstore") .withQuery(WindowQuery.withRange(key1, key2, lowerBound, upperBound)); // run the query StateQueryResult<KeyValueIterator<IntegerStateQueryResult<KeyValueIterator<Windowed<Bytes>>, Integer>>byte[]> result = kafkaStreams.query(query); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Windowed<Bytes>QueryResult<KeyValueIterator<Windowed<Bytes>>, Integer>>byte[]> partitionResults = rangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Windowed<Bytes>QueryResult<KeyValueIterator<Windowed<Bytes>>, byte[]>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<IntegerKeyValueIterator<Windowed<Bytes>>, Integer>byte[] keyValueIterator = entry.getValue().getResult()) { while (keyValueIterator.hasNext()) { final KeyValue<IntegerKeyValue<Bytes, Integer>byte[]> next = keyValueIterator.next(); IntegerBytes key = next.key.get; Integerbyte[] value = next.value; } } } |
...