You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: 

JIRA: KAFKA-15347 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The main goal is supporting interactive queries in presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and discusses single-key, multi-timestamp queries. Other types of IQs are explained in the following KIP (KIP-969)  


Key Queries with multiple timestamps:

  1. single-key query with upper bound timestamp
  2. single-key query with lower bound timestamp
  3. single-key query with timestamp range
  4. single-key all versions query

Public Interfaces

In this KIP we propose a public classes, MultiVersionedKeyQuery. Moreover, the public interface ValueIterator is used to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp). 

Proposed Changes

To be able to list the tombstones, the VersionedRecord class can accept NULL values as well.

For single-key queries, MultiVersionedKeyQuery and ValueIterator classes will be used.

ValueIterator.java
package org.apache.kafka.streams.state;

/**
 * Iterator interface of {@link Value}.
 * <p>
 * Users must call its {@code close} method explicitly upon completeness to release resources,
 * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
 * Note that {@code remove()} is not supported.
 *
 * @param <V> Type of values
 */
public interface ValueIterator<V> extends Iterator<V>, Closeable {

    @Override
    void close();

    /**
     * Peek the next value without advancing the iterator
     * @return the value that would be returned from the next call to next
     */
    V peek();
}

MultiVersionedKeyQuery class

  • The methods are composable. Therefore, the meaningless combinations such as withKey(k1).asOf(t1).allVersions() end up throwing a RunTimeException (for example NotSupportedException).
    • Defining a query with time range (empty, t1] will be translated into [0, t1]
    • Defining a query with time range (t1, empty) will be translated into [t1, MAX)
    • A query with no specified time range will be interpreted as single-key_single-timestamp that returns the record with the latest timestamp.
  • As explained in the javadocs, the query returns all valid records within the specified time range.
    • The fromTimestamp specifies the starting point. There can be records which have been inserted before the fromTimestamp and are valid in the time range. 
    • The asOfTimestamp specifies the ending point. Records that have been inserted at asOfTimestamp are returned by the query as well.
  • The order of the returned records is by default ascending by timestamp. The method withDescendingTimestamps() can reverse the order.
MultiVersionedKeyQuery.java
package org.apache.kafka.streams.query;

/**
 * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range.
 */
@Evolving
public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> {

  private final K key;
  private final Optional<Instant> fromTimestamp;
  private final Optional<Instant> asOfTimestamp;
  private final boolean isAscending;

  private MultiVersionedKeyQuery(
      final K key,
      Optional<Instant> fromTimestamp,
      Optional<Instant> asOfTimestamp,
      boolean isAscending) {
    this.key = Objects.requireNonNull(key);
    this.fromTimestamp = fromTimestamp;
    this.asOfTimestamp = asOfTimestamp;
    this.isAscending = isAscending;
  }

  /**
   * Creates a query that will retrieve the set of records identified by {@code key} if any exists
   * (or {@code null} otherwise).
   * @param key The key to retrieve
   * @param <K> The type of the key
   * @param <V> The type of the value that will be retrieved
   */
  public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key);

  /**
   * Specifies the starting time point for the key query.
   * The key query returns all the records that are valid in the time range starting from the timestamp {@code fromTimestamp}.
   * @param fromTimestamp The starting time point
   */
  public MultiVersionedKeyQuery<K, V> from(Instant fromTimestamp);

  /**
   * Specifies the ending time point for the key query.
   * The key query returns all the records that have timestamp <= {@code asOfTimestamp}.
   * @param asOfTimestamp The ending time point
   */
  public MultiVersionedKeyQuery<K, V> asOf(Instant asOfTimestamp);

  /**
   * Specifies the starting and ending points of the key query as MIN and MAX respectively.
   * Therefore, the query returns all the existing records in the state store with the specified key.
   * @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already
   * specified.
   */
  public MultiVersionedKeyQuery<K, V> allVersions();

  /**
   * Specifies the order of the returned records by the query as descending by timestamp.
   */
  public MultiVersionedKeyQuery<K, V> withDescendingTimestamps();

  /**
   * The key that was specified for this query.
   */
  public K getKey();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> getFromTimestamp();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> getAsOfTimestamp();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isAscending ();
}

Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

final MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(1).allVersions();

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request =
        inStore("my_store").withQuery(query);

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult = kafkaStreams.query(request);

// Get the results from all partitions.
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults = versionedKeyResult.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final VersionedRecord<Integer> record = iterator.next();
          Long timestamp = record.timestamp();
		  Integer value = record.value();	
        }
     }
}  

Compatibility, Deprecation, and Migration Plan

  • Since this is a completely new set of APIs, no backward compatibility concerns are anticipated. 
  • Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.
  • No labels