...
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Imagine we have the following records
put(1, 1, time=2023-01-01T10:00:00.00Z)
put(1, null, time=2023-01-05T10:00:00.00Z)
put(2, 20, time=2023-01-10T10:00:00.00Z)
put(3, 30, time=2023-01-12T10:00:00.00Z)
put(1, 2, time=2023-01-15T10:00:00.00Z)
put(1, 3, time=2023-01-20T10:00:00.00Z)
put(2, 30, time=2023-01-25T10:00:00.00Z)
Code Block | ||||
---|---|---|---|---|
| ||||
// example 1: MultiVersionedRangeQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedRangeQuery<Integer, Integer> query1 =
MultiVersionedRangeQuery.withKeyRange(1, 2);
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request1 =
inStore("my_store").withQuery(query1);
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult1 = kafkaStreams.query(request1);
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
while (iterator.hasNext()) {
final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
Integer key = record.key;
Integer value = record.value.value();
Long timestamp = record.value.timestamp();
Long validTo = record.value.validTo();
System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));
}
}
}
/* the printed output will be
key,value: 1,1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z
key,value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z
key,value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now
key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z
key,value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now
*/
// example 2: The value of the records with key range (1,2) from 2023-01-17 Time: 10:00:00.00Z till 2023-01-30 T10:00:00.00Z
final MultiVersionedRangeQuery<Integer, Integer> query2 =
MultiVersionedRangeQuery.withKeyRange(1, 2).fromTime(Instant.parse(2023-01-17T10:00:00.00Z)).toTime(Instant.parse(2023-01-30T10:00:00.00Z));
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request2 =
inStore("my_store").withQuery(query2);
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult2 = kafkaStreams.query(request2);
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults2 = versionedRangeResult2.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
while (iterator.hasNext()) {
final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
| ||||
Code Block | ||||
| ||||
final MultiVersionedRangeQuery<Integer, Integer> query = Integer MultiVersionedRangeQuery.withKeyRange(1, 2).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request = key = record.key; Integer value inStore("my_store").withQuery(query= record.value.value(); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request); // Get the resultsLong fromtimestamp all= partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResultsrecord.value.timestamp(); Long validTo = record.value.validTo(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); } } }System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now */ |
Compatibility, Deprecation, and Migration Plan
...
The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries). Moreover , there will be unit tests where ever needed.