THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
final MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(1).allVersions(from(Instant.parse("2023-08-03T10:37:30.00Z")); final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request = inStore("my_store").withQuery(query); final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult = kafkaStreams.query(request); // Get the results from all partitions. final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults = versionedKeyResult.getPartitionResults(); for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final VersionedRecord<Integer> record = iterator.next(); Long timestamp = record.timestamp(); Integer value = record.value(); } } } |
...