THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- The methods are composable. Therefore, the meaningless combinations such as withKey(k1).asOf(t1).allVersions() end up throwing a RunTimeException (for example NotSupportedException).
- Defining a query with time range (empty, t1] will be translated into [0, t1]
- Defining a query with time range (t1, empty) will be translated into [t1, MAX)
- A query with no specified time range will be interpreted as single-key_single-timestamp that returns the record with the latest timestamp.
- As explained in the javadocs, the query returns all valid records within the specified time range.
- The fromTimestamp specifies the starting point. There can be records which have been inserted before the fromTimestamp and are valid in the time range.
- The asOfTimestamp specifies the ending point. Records that have been inserted at asOfTimestamp are returned by the query as well.
- The order of the returned records is by default Ascending by timestamp. The method withDescendingTimestamps() can reverse the order.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. */ @Evolving public final class VersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { private final K key; private final Optional<Instant> fromTimestamp; private final Optional<Instant> asOfTimestamp; private final boolean isAscending; private VersionedKeyQuery( final K key, Optional<Instant> fromTimestamp, Optional<Instant> asOfTimestamp, boolean isAscending) { this.key = Objects.requireNonNull(key); this.fromTimestamp = fromTimestamp; this.asOfTimestamp = asOfTimestamp; this.isAscending = isAscending; } /** * Creates a query that will retrieve the set of records identified by {@code key} if any exists * (or {@code null} otherwise). * @param key The key to retrieve * @param <K> The type of the key * @param <V> The type of the value that will be retrieved */ public static <K, V> VersionedKeyQuery<K, V> withKey(final K key); /** * Specifies the starting time point for the key query. * The key query returns all the records that are valid in the time range starting from the timestamp {@code fromTimestamp}. * @param fromTimestamp The starting time point */ public VersionedKeyQuery<K, V> from(Instant fromTimestamp); /** * Specifies the ending time point for the key query. * The key query returns all the records that have timestamp <= {@code asOfTimestamp}. * @param asOfTimestamp The ending time point */ public VersionedKeyQuery<K, V> asOf(Instant asOfTimestamp); /** * Specifies the starting and ending points of the key query as MIN and MAX respectively. * Therefore, the query returns all the existing records in the state store with the specified key. * @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already * specified. */ public VersionedKeyQuery<K, V> allVersions(); /** * Specifies the order of the returned records by the query as descending by timestamp. */ public VersionedKeyQuery<K, V> withDescendingTimestamps(); /** * The key that was specified for this query. */ public K getKey(); /** * The starting time point of the query, if specified */ public Optional<Instant> getFromTimestamp(); /** * The ending time point of the query, if specified */ public Optional<Instant> getAsOfTimestamp(); /** * @return true if the query returns records in ascending order of timestamps */ public boolean isAscending (); } |
...