Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
final MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(1).allVersions(from(Instant.parse("2023-08-03T10:37:30.00Z"));

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request =
        inStore("my_store").withQuery(query);

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult = kafkaStreams.query(request);

// Get the results from all partitions.
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults = versionedKeyResult.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final VersionedRecord<Integer> record = iterator.next();
          Long timestamp = record.timestamp();
		  Integer value = record.value();	
        }
     }
}  

...