Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

Imagine we have the following records 

put(1, 1, time=2023-01-01T10:00:00.00Z)

put(1, null, time=2023-01-05T10:00:00.00Z)

put(2, 20, time=2023-01-10T10:00:00.00Z)

put(3, 30, time=2023-01-12T10:00:00.00Z)

put(1, 2, time=2023-01-15T10:00:00.00Z)

put(1, 3, time=2023-01-20T10:00:00.00Z)

put(2, 30, time=2023-01-25T10:00:00.00Z)

Code Block
languagejava
linenumberstrue
// example 1: MultiVersionedRangeQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedRangeQuery<Integer, Integer> query1 =
        MultiVersionedRangeQuery.withKeyRange(1, 2);
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request1 =
        inStore("my_store").withQuery(query1);
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult1 = kafkaStreams.query(request1);
 
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          Integer key = record.key;
          Integer value = record.value.value();
          Long timestamp = record.value.timestamp();
		  Long validTo = record.value.validTo();
	      System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));  	
        }
     }
}
/* the printed output will be
    key,value: 1,1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z  
    key,value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z 
    key,value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now 
    key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z 
    key,value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now 
*/

// example 2: The value of the records with key range (1,2) from 2023-01-17 Time: 10:00:00.00Z till 2023-01-30 T10:00:00.00Z


final MultiVersionedRangeQuery<Integer, Integer> query2 =
        MultiVersionedRangeQuery.withKeyRange(1, 2).fromTime(Instant.parse(2023-01-17T10:00:00.00Z)).toTime(Instant.parse(2023-01-30T10:00:00.00Z));
 
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request2 =
        inStore("my_store").withQuery(query2);
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult2 = kafkaStreams.query(request2);
 
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults2 = versionedRangeResult2.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
    
Code Block
languagejava
linenumberstrue
final MultiVersionedRangeQuery<Integer, Integer> query =
      Integer  MultiVersionedRangeQuery.withKeyRange(1, 2).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z));
 
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request =
key = record.key;
          Integer value  inStore("my_store").withQuery(query= record.value.value();
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request);
 
// Get the resultsLong fromtimestamp all= partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResultsrecord.value.timestamp();
 		  Long validTo = record.value.validTo();
for	 (final  Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          Integer key = record.key;
          Integer value = record.value.value();
          Long timestamp = record.value.timestamp();
        }
     }
}System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));           }
     }
}

/* the printed output will be     
    value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z 
    value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now
    value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now      
*/


Compatibility, Deprecation, and Migration Plan

...

The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries). Moreover , there will be unit tests where ever needed.