This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: here 

JIRA: KAFKA-15347 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The main goal is supporting interactive queries in presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and discusses single-key, multi-timestamp queries. Other types of IQs are explained in the following KIP (KIP-969)  


Key Queries with multiple timestamps:

  1. single-key query with upper bound timestamp
  2. single-key query with lower bound timestamp
  3. single-key query with timestamp range
  4. single-key all versions query

Public Interfaces

In this KIP,

Proposed Changes

To be able to list the tombstones, the validTo filed is added to the VersionedRecord class. The default value of validTo is positive infinity.

package org.apache.kafka.streams.state;

public final class VersionedRecord<V> {

 	/**
     * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
     *
     * @param value      the value
     * @param timestamp  the timestamp
     */
    public VersionedRecord(final V value, final long timestamp) {
        this.value = Objects.requireNonNull(value);
        this.timestamp = timestamp;
        this.validTo = Long.MAX_VALUE;
    }      
	
	/**
     * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
     *
     * @param value      the value
     * @param timestamp  the timestamp
     * @param validTo    the latest timestamp that value is valid
     */
     public VersionedRecord(final V value, final long timestamp, final long validTo);


    /**
     * Returns the {@code validTo} 
     */
     public long validTo();
}


For single-key queries, MultiVersionedKeyQuery and ValueIterator classes will be used.

package org.apache.kafka.streams.state;

/**
 * Iterator interface of {@link Value}.
 * <p>
 * Users must call its {@code close} method explicitly upon completeness to release resources,
 * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
 * Note that {@code remove()} is not supported.
 *
 * @param <V> Type of values
 */
public interface ValueIterator<V> extends Iterator<V>, Closeable {

    @Override
    void close();

    /**
     * Peek the next value without advancing the iterator
     * @return the value that would be returned from the next call to next
     */
    V peek();
}

MultiVersionedKeyQuery class

package org.apache.kafka.streams.query;

/**
 * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range.
 */
@Evolving
public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> {

  private final K key;
  private final Optional<Instant> fromTime;
  private final Optional<Instant> toTime;
  private final boolean isAscending;

  private MultiVersionedKeyQuery(
      final K key,
      final Optional<Instant> fromTime,
      final Optional<Instant> toTime,
      final boolean isAscending) {
    this.key = Objects.requireNonNull(key);
    this.fromTime = fromTime;
    this.toTime = toTime;
    this.isAscending = isAscending;
  }

  /**
   * Creates a query that will retrieve the set of records identified by {@code key} if any exists
   * (or {@code null} otherwise).
   * @param key The key to retrieve
   * @throws NullPointerException if @param key is null           
   * @param <K> The type of the key
   * @param <V> The type of the value that will be retrieved    
   * @throws NullPointerException if @param key is null
   */
  public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key);

  /**
   * Specifies the starting time point for the key query.
   * <pre>
   * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} 
   * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT became tombstone at or after {@code fromTime}.  
   * </pre>  
   * @param fromTime The starting time point
   * If @param fromTime is null, will be considered as empty optional
   */
  public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime);

  /**
   * Specifies the ending time point for the key query.
   * The key query returns all the records that have timestamp <= {@code toTime}.
   * @param toTime The ending time point
   * If @param toTime is null, will be considered as empty optional
   */
  public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime);

  /**
   * Specifies the order of the returned records by the query as descending by timestamp.
   */
  public MultiVersionedKeyQuery<K, V> withDescendingTimestamps();

  /**
   * Specifies the order of the returned records by the query as ascending by timestamp.
   */
  public MultiVersionedKeyQuery<K, V> withAscendingTimestamps();

   /**
   * The key that was specified for this query.
   */
  public K key();

  /**
   * The starting time point of the query, if specified
   */
  public Optional<Instant> fromTime();

  /**
   * The ending time point of the query, if specified
   */
  public Optional<Instant> toTime();

  /**
   * @return true if the query returns records in ascending order of timestamps
   */
  public boolean isAscending ();
}


Another get method is added to the VersionedKeyValueStore interface.

package org.apache.kafka.streams.state;

public interface VersionedKeyValueStore<K, V> extends StateStore {
      /**
     * Get the record associated with this key as of the specified timestamp (i.e.,
     * the existing record with the largest timestamp not exceeding the provided
     * timestamp bound).
     *
     * @param key           The key to fetch
     * @param fromTime The timestamp lower bound. The records that have been inserted at 
							or before this timestamp and did not become tombstone at or before 
							this timestamp will be retrieved and returned.
     * @param toTime The timestamp bound. This bound is inclusive; if a record
     *                      (for the specified key) exists with this timestamp, then
     *                      this is the record that will be returned.
     * @return The value and timestamp (along with the validTo timestamp) of the record associated with this key
     *         as of the provided timestamp, or {@code null} if no such record exists
     *         (including if the provided timestamp bound is older than this store's history
     *         retention time, i.e., the store no longer contains data for the provided
     *         timestamp). 
     * @throws NullPointerException       If null is used for key.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    VersionedRecord<V> get(K key, long fromTime, long toTime);
}

Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Imagine we have the following records 

put(1, 1, time=2023-01-01T10:00:00.00Z)

put(1, null, time=2023-01-05T10:00:00.00Z)

put(1, null, time=2023-01-10T10:00:00.00Z)

put(1, 2, time=2023-01-15T10:00:00.00Z)

put(1, 3, time=2023-01-20T10:00:00.00Z)

// example 1: MultiVersionedKeyQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedKeyQuery<Integer, Integer> query1 = MultiVersionedKeyQuery.withKey(1);

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request1 =
        inStore("my_store").withQuery(query1);

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult1 = kafkaStreams.query(request1);

// Get the results from all partitions
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults1 = versionedKeyResult1.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults1.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final VersionedRecord<Integer> record = iterator.next();
          Long timestamp = record.timestamp();
          Long validTo = record.validTo();
          Integer value = record.value();	
		  System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));	
        }
     }
}  
/* the printed output will be
	value: 1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z  
	value: 2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z  
	value: 3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now  
*/

// example 2: The value of the record with key=1 from 2023-01-17 Time: 10:00:00.00Z till 2023-01-25 T10:00:00.00Z
 
MultiVersionedKeyQuery<Integer, Integer> query2 = MultiVersionedKeyQuery.withKey(1);
query2 = query2.fromTime(Instant.parse("2023-01-17T10:00:00.00Z")).toTime(Instant.parse("2023-01-25T10:00:00.00Z"))

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request2 =
        inStore("my_store").withQuery(query2);

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult2 = kafkaStreams.query(request2);

// Get the results from all partitions
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults2 = versionedKeyResult2.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults2.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final VersionedRecord<Integer> record = iterator.next();
          Long timestamp = record.timestamp();
          Long validTo = record.validTo();
          Integer value = record.value();	
		  System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));	
        }
     }
}  
/* the printed output will be   	
	value: 2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z  
	value: 3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now    
*/  

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

In order to be able to retrieve the consecutive tombstones, we can have a method or flag (disabled by default) to allow users to get all tombstones. If it is a real use case for the users, we will add it later.

Test Plan

The single-key_multi-timestamp interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned key queries). Moreover , there will be unit tests where ever needed.