THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- we propose a public class MultiVersionedKeyQuery.
- Moreover, the public interface ValueIteratorVersionedRecordIterator is added to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp).
- In addition, a new method is added to the VersionedKeyValueStore interface to support single-key_multi-timestamp queries.
- Finally, a field called validTo is added to the VersionedRecord class to enable us representing tombstones as well.
...
Code Block | ||||
---|---|---|---|---|
| ||||
// example 1: MultiVersionedKeyQuery without specifying any time bound will be interpreted as all versions final MultiVersionedKeyQuery<Integer, Integer> query1 = MultiVersionedKeyQuery.withKey(1); final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>>StateQueryRequest<VersionedRecordIterator<Integer>> request1 = StateQueryRequest.inStore("my_store").withQuery(query1); final StateQueryResult<ValueIterator<VersionedRecord<Integer>>>StateQueryResult<VersionedRecordIterator<Integer>> versionedKeyResult1 = kafkaStreams.query(request1); // Get the results from all partitions final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>>QueryResult<VersionedRecordIterator<Integer>>> partitionResults1 = versionedKeyResult1.getPartitionResults(); for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>>QueryResult<VersionedRecordIterator<Integer>>> entry : partitionResults1.entrySet()) { try (final ValueIterator<VersionedRecord<Integer>>VersionedRecordIterator<Integer> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final VersionedRecord<Integer> record = iterator.next(); Long timestamp = record.timestamp(); Long validTo = record.validTo(); Integer value = record.value(); System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be value: 1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z value: 2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z value: 3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now */ // example 2: The value of the record with key=1 from 2023-01-17 Time: 10:00:00.00Z till 2023-01-25 T10:00:00.00Z MultiVersionedKeyQuery<Integer, Integer> query2 = MultiVersionedKeyQuery.withKey(1); query2 = query2.fromTime(Instant.parse("2023-01-17T10:00:00.00Z")).toTime(Instant.parse("2023-01-25T10:00:00.00Z")) final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>>StateQueryRequest<VersionedRecordIterator<Integer>> request2 = StateQueryRequest.inStore("my_store").withQuery(query2); final StateQueryResult<ValueIterator<VersionedRecord<Integer>>>StateQueryResult<VersionedRecordIterator<Integer>> versionedKeyResult2 = kafkaStreams.query(request2); // Get the results from all partitions final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>>QueryResult<VersionedRecordIterator<Integer>>> partitionResults2 = versionedKeyResult2.getPartitionResults(); for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>>QueryResult<VersionedRecordIterator<Integer>>> entry : partitionResults2.entrySet()) { try (final ValueIterator<VersionedRecord<Integer>>VersionedRecordIterator<Integer> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final VersionedRecord<Integer> record = iterator.next(); Long timestamp = record.timestamp(); Long validTo = record.validTo(); Integer value = record.value(); System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be value: 2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z value: 3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now */ |
...