THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Imagine we have the following records
put(1, 1, time=2023-01-01T10:00:00.00Z)
put(1, null, time=2023-01-05T10:00:00.00Z)
put(1, null, time=2023-01-10T10:00:00.00Z)
put(1, 2, time=2023-01-20T10:00:00.00Z)
Code Block | ||||
---|---|---|---|---|
| ||||
// example 1: MultiVersionedKeyQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedKeyQuery<Integer, Integer> query1 = MultiVersionedKeyQuery.withKey(1);
final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request1 =
inStore("my_store").withQuery(query1);
final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult1 = kafkaStreams.query(request1);
// Get the results from all partitions
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults1 = versionedKeyResult1.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults1.entrySet()) {
try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
while (iterator.hasNext()) {
final VersionedRecord<Integer> record = iterator.next();
Long timestamp = record.timestamp();
Long validTo = record.validTo();
Integer value = record.value();
System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));
}
}
}
/* the printed output will be
value: 1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z
value: 2, timestamp: 2023-01-020T10:00:00.00Z, valid till: now
*/
// example 2 | ||||
Code Block | ||||
| ||||
final MultiVersionedKeyQuery<Integer, Integer> query1 = MultiVersionedKeyQuery.withKey(1); final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request1 = inStore("my_store").withQuery(query1); final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult1 = kafkaStreams.query(request1); // Get the results from all partitions. final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults1 = versionedKeyResult1.getPartitionResults(); for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults1.entrySet()) { try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final VersionedRecord<Integer> record = iterator.next(); Long timestamp = record.timestamp(); Integer value = record.value(); } } } final MultiVersionedKeyQuery<Integer, Integer> queryquery2 = MultiVersionedKeyQuery.withKey(1).from(Instant.parse("2023-0801-03T1001T10:3700:3000.00Z")); |
Compatibility, Deprecation, and Migration Plan
...