Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
// example 1: MultiVersionedKeyQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedKeyQuery<Integer, Integer> query1 = MultiVersionedKeyQuery.withKey(1);

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request1 =
        inStore("my_store").withQuery(query1);

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult1 = kafkaStreams.query(request1);

// Get the results from all partitions
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults1 = versionedKeyResult1.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults1.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final VersionedRecord<Integer> record = iterator.next();
          Long timestamp = record.timestamp();
          Long validTo = record.validTo();
          Integer value = record.value();	
		  System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));	
        }
     }
}  
/* the printed output will be
	value: 1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z  
	value: 2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z  
	value: 3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now  
*/

// example 2
final : The value of the record with key=1 from 2023-01-17 Time: 10:00:00.00Z till 2023-01-25 T10:00:00.00Z
 
MultiVersionedKeyQuery<Integer, Integer> query2 = MultiVersionedKeyQuery.withKey(1);
query2 = query2.from(Instant.parse("2023-01-17T10:00:00.00Z")).asOf(Instant.parse("2023-01-01T1025T10:00:00.00Z"));

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request2 =
        inStore("my_store").withQuery(query2);

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult2 = kafkaStreams.query(request2);

// Get the results from all partitions
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults2 = versionedKeyResult2.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults2.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final VersionedRecord<Integer> record = iterator.next();
          Long timestamp = record.timestamp();
          Long validTo = record.validTo();
          Integer value = record.value();	
		  System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));	
        }
     }
}  
/* the printed output will be   	
	value: 12, timestamp: 2023-01-01T1015T10:00:00.00Z, valid till: 2023-01-05T1020T10:00:00.00Z   
	value: 23, timestamp: 2023-01-020T1020T10:00:00.00Z, valid till: now    
*/   

Compatibility, Deprecation, and Migration Plan

...