Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Code Block
package org.apache.kafka.streams.state;

 * Iterator interface of {@link Value}.
 * <p>
 * Users must call its {@code close} method explicitly upon completeness to release resources,
 * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
 * Note that {@code remove()} is not supported.
 * @param <V> Type of values
public interface ValueIterator<V> extends Iterator<V>, Closeable {

    void close();

     * Peek the next value without advancing the iterator
     * @return the value that would be returned from the next call to next
    V peek();

VersionedKeyQuery MultiVersionedKeyQuery class

  • The methods are composable. Therefore, the meaningless combinations such as withKey(k1).asOf(t1).allVersions() end up throwing a RunTimeException (for example NotSupportedException).
    • Defining a query with time range (empty, t1] will be translated into [0, t1]
    • Defining a query with time range (t1, empty) will be translated into [t1, MAX)
    • A query with no specified time range will be interpreted as single-key_single-timestamp that returns the record with the latest timestamp.
  • As explained in the javadocs, the query returns all valid records within the specified time range.
    • The fromTimestamp specifies the starting point. There can be records which have been inserted before the fromTimestamp and are valid in the time range. 
    • The asOfTimestamp specifies the ending point. Records that have been inserted at asOfTimestamp are returned by the query as well.
  • The order of the returned records is by default ascending by timestamp. The method withDescendingTimestamps() can reverse the order.
Code Block
package org.apache.kafka.streams.query;

 * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range.
public final class VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> {

  private final K key;
  private final Optional<Instant> fromTimestamp;
  private final Optional<Instant> asOfTimestamp;
  private final boolean isAscending;

  private VersionedKeyQueryMultiVersionedKeyQuery(
      final K key,
      Optional<Instant> fromTimestamp,
      Optional<Instant> asOfTimestamp,
      boolean isAscending) {
    this.key = Objects.requireNonNull(key);
    this.fromTimestamp = fromTimestamp;
    this.asOfTimestamp = asOfTimestamp;
    this.isAscending = isAscending;

   * Creates a query that will retrieve the set of records identified by {@code key} if any exists
   * (or {@code null} otherwise).
   * @param key The key to retrieve
   * @param <K> The type of the key
   * @param <V> The type of the value that will be retrieved
  public static <K, V> VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> withKey(final K key);

   * Specifies the starting time point for the key query.
   * The key query returns all the records that are valid in the time range starting from the timestamp {@code fromTimestamp}.
   * @param fromTimestamp The starting time point
  public VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> from(Instant fromTimestamp);

   * Specifies the ending time point for the key query.
   * The key query returns all the records that have timestamp <= {@code asOfTimestamp}.
   * @param asOfTimestamp The ending time point
  public VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> asOf(Instant asOfTimestamp);

   * Specifies the starting and ending points of the key query as MIN and MAX respectively.
   * Therefore, the query returns all the existing records in the state store with the specified key.
   * @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already
   * specified.
  public VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> allVersions();

   * Specifies the order of the returned records by the query as descending by timestamp.
  public VersionedKeyQuery<KMultiVersionedKeyQuery<K, V> withDescendingTimestamps();

   * The key that was specified for this query.
  public K getKey();

   * The starting time point of the query, if specified
  public Optional<Instant> getFromTimestamp();

   * The ending time point of the query, if specified
  public Optional<Instant> getAsOfTimestamp();

   * @return true if the query returns records in ascending order of timestamps
  public boolean isAscending ();


Code Block
final VersionedKeyQuery<IntegerMultiVersionedKeyQuery<Integer, Integer> query = VersionedKeyQueryMultiVersionedKeyQuery.withKey(1).allVersions();

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request =

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult = kafkaStreams.query(request);

// Get the results from all partitions.
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults = versionedKeyResult.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final VersionedRecord<Integer> record =;
          Long timestamp = record.timestamp();
		  Integer value = record.value();	
