...
Code Block |
---|
language | java |
---|
firstline | 1 |
---|
title | ValueIterator.java |
---|
linenumbers | true |
---|
|
package org.apache.kafka.streams.state;
/**
* Iterator interface of {@link Value}.
* <p>
* Users must call its {@code close} method explicitly upon completeness to release resources,
* or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
* Note that {@code remove()} is not supported.
*
* @param <V> Type of values
*/
public interface ValueIterator<V> extends Iterator<V>, Closeable {
@Override
void close();
/**
* Peek the next value without advancing the iterator
* @return the value that would be returned from the next call to next
*/
V peek();
} |
VersionedKeyQuery MultiVersionedKeyQuery class
- The methods are composable. Therefore, the meaningless combinations such as withKey(k1).asOf(t1).allVersions() end up throwing a RunTimeException (for example NotSupportedException).
- Defining a query with time range (empty, t1] will be translated into [0, t1]
- Defining a query with time range (t1, empty) will be translated into [t1, MAX)
- A query with no specified time range will be interpreted as single-key_single-timestamp that returns the record with the latest timestamp.
- As explained in the javadocs, the query returns all valid records within the specified time range.
- The fromTimestamp specifies the starting point. There can be records which have been inserted before the fromTimestamp and are valid in the time range.
- The asOfTimestamp specifies the ending point. Records that have been inserted at asOfTimestamp are returned by the query as well.
- The order of the returned records is by default ascending by timestamp. The method withDescendingTimestamps() can reverse the order.
Code Block |
---|
language | java |
---|
firstline | 1 |
---|
title | VersionedKeyQueryMultiVersionedKeyQuery.java |
---|
linenumbers | true |
---|
|
package org.apache.kafka.streams.query;
/**
* Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range.
*/
@Evolving
public final class VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> {
private final K key;
private final Optional<Instant> fromTimestamp;
private final Optional<Instant> asOfTimestamp;
private final boolean isAscending;
private VersionedKeyQueryMultiVersionedKeyQuery(
final K key,
Optional<Instant> fromTimestamp,
Optional<Instant> asOfTimestamp,
boolean isAscending) {
this.key = Objects.requireNonNull(key);
this.fromTimestamp = fromTimestamp;
this.asOfTimestamp = asOfTimestamp;
this.isAscending = isAscending;
}
/**
* Creates a query that will retrieve the set of records identified by {@code key} if any exists
* (or {@code null} otherwise).
* @param key The key to retrieve
* @param <K> The type of the key
* @param <V> The type of the value that will be retrieved
*/
public static <K, V> VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> withKey(final K key);
/**
* Specifies the starting time point for the key query.
* The key query returns all the records that are valid in the time range starting from the timestamp {@code fromTimestamp}.
* @param fromTimestamp The starting time point
*/
public VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> from(Instant fromTimestamp);
/**
* Specifies the ending time point for the key query.
* The key query returns all the records that have timestamp <= {@code asOfTimestamp}.
* @param asOfTimestamp The ending time point
*/
public VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> asOf(Instant asOfTimestamp);
/**
* Specifies the starting and ending points of the key query as MIN and MAX respectively.
* Therefore, the query returns all the existing records in the state store with the specified key.
* @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already
* specified.
*/
public VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> allVersions();
/**
* Specifies the order of the returned records by the query as descending by timestamp.
*/
public VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> withDescendingTimestamps();
/**
* The key that was specified for this query.
*/
public K getKey();
/**
* The starting time point of the query, if specified
*/
public Optional<Instant> getFromTimestamp();
/**
* The ending time point of the query, if specified
*/
public Optional<Instant> getAsOfTimestamp();
/**
* @return true if the query returns records in ascending order of timestamps
*/
public boolean isAscending ();
} |
...
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
|
final VersionedKeyQuery<IntegerMultiVersionedKeyQuery<Integer, Integer> query = VersionedKeyQueryMultiVersionedKeyQuery.withKey(1).allVersions();
final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request =
inStore("my_store").withQuery(query);
final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult = kafkaStreams.query(request);
// Get the results from all partitions.
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults = versionedKeyResult.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
while (iterator.hasNext()) {
final VersionedRecord<Integer> record = iterator.next();
Long timestamp = record.timestamp();
Integer value = record.value();
}
}
} |
...