Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under DiscussionAccepted

Discussion thread: here 

JIRA: KAFKA-15347 

...

  • we propose a public class MultiVersionedKeyQuery., MultiVersionedKeyQuery
  • and a public enum ResultOrder
  • Moreover, the public interface ValueIteratorVersionedRecordIterator is added to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp). 
  • In addition, a new method is added to the VersionedKeyValueStore interface to support single-key_multi-timestamp queries.
  • Finally, a field called validTo is added to the VersionedRecord class to enable us representing tombstones as well. 

...

To be able to list the tombstones, the validTo filed Optional field is added to the VersionedRecord class. The default value of validTo is positive infinity is Optional.empty() which means the record is still valid.

Code Block
languagejava
firstline1
titleVersionedRecord.java
linenumberstrue
package org.apache.kafka.streams.state;

public final class VersionedRecord<V> {

 	/**
     * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
     *
     * @param value      the value
     * @param timestamp  the timestamp
     */
    public VersionedRecord(final V value, final long timestamp) {
        this.value = Objects.requireNonNull(value);
        this.timestamp = timestamp;
        this.validTo = Long.MAX_VALUEOptional.empty();
    }      
	
	/**
     * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
     *
     * @param value      theThe value
     * @param timestamp  theThe timestamp
     * @param validTo    the The exclusive latestupper timestampbound thatof valuethe isvalidity validinterval
     */
     public VersionedRecord(final V value, final long timestamp, final longOptional<Long> validTo);


    /**
     * Returns the {@code validTo} 
     */
     public longOptional<Long> validTo();
}


For single-key queries, MultiVersionedKeyQuery and ValueIteratorVersionedRecordIterator classes will be used.

Code Block
languagejava
firstline1
titleValueIteratorVersionedRecordIterator.java
linenumberstrue
package org.apache.kafka.streams.state;

/**
 * Iterator interface of {@link ValueV}.
 * <p>
 * Users must call its {@code close} method explicitly upon completeness to release resources,
 * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
 * Note that {@code remove()} is not supported.
 *
 * @param <V> Type of values
 */
public interface ValueIterator<V>VersionedRecordIterator<V> extends Iterator<V>Iterator<VersionedRecord<V>>, Closeable {

    @Override
    void close();
}

ResultOrder enum
It helps with specifying the order of the returned results by the query.


Code Block
languagejava
firstline1
titleResultOrder
linenumberstrue
package org.apache.kafka.streams.query;

public enum ResultOrder  /**{
    ANY,
 * Peek the next value without advancing the iterator ASCENDING,
     * @return the value that would be returned from the next call to next
     */
    V peek();
DESCENDING
}

MultiVersionedKeyQuery class

  • The methods are composable. The fromTime(Instant fromTime) and toTime(Instant toTime) methods specify the time range.
    • If a user applies the same time limit multiple times such as MultiVersionedKeyQuery.withKey(k).fromTime(t1).fromTime(t2), then the last one wins (it will be translated to MultiVersionedKeyQuery.withKey(k).fromTime(t2)).

    • Defining a query with time range (empty, t1] will be translated into [0, t1] (calling only the toTime(t1) method).
    • Defining a query with time range [t1, empty) will be translated into [t1, MAX) (calling only the fromTime(t1) method).
    • A query with no specified time range will be translated into [0, MAX). It means that the query will return all the versions of the records with specified key.
  • As explained in the javadocs, the query returns all valid records within the specified time range.
    • The fromTime specifies the starting point. There can be records which have been inserted before the fromTime and are valid in the time range. For example, if the record (k,v) has been inserted at time=0, it will be returned by the multi versioned key queries with key=k and fromTime>=0. Obviously, if the record (k,v) becomes tombstone at time=2, then the multi versioned key queries with key=k and fromTime>=2 will not return it any more. In this case, the multi versioned key queries with key=k and fromTime<2 will return the record (k,v) validTo=2.
    • The toTime specifies the ending point. Records that have been inserted at toTime are returned by the query as well.
  • The order of the returned records is by default ascending by timestamp. The method withDescendingTimestamps() can reverse the order. Btw, withAscendingTimestamps() method can be used for code readability purposeNo ordering is guaranteed for the results, but the results can be sorted by timestamp (in ascending or descending order) by calling the corresponding defined methods (withAscendingTimestamps() and withDescendingTimestamps() respectively)

Code Block
languagejava
firstline1
titleMultiVersionedKeyQuery.java
linenumberstrue
package org.apache.kafka.streams.query;  

/**
 * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range.
 */
@Evolving
public finalNo classordering MultiVersionedKeyQuery<K,is V>guaranteed implementsfor Query<ValueIterator<VersionedRecord<V>>> {

  private final K key;
  private final Optional<Instant> fromTime;
  private final Optional<Instant> toTime;
  private final boolean isAscending;

  private MultiVersionedKeyQuery(
      final K key,
      Optional<Instant> fromTime,
      Optional<Instant> toTime,
      boolean isAscending) {
    this.key = Objects.requireNonNull(key);
    this.fromTime = fromTime;
    this.toTime =the results, but the results can be sorted by timestamp (in ascending or descending order) by calling the corresponding defined methods.
 *
 *  @param <K> The type of the key.
 *  @param <V> The type of the result returned by this query.
 */

@Evolving
public final class MultiVersionedKeyQuery<K, V> implements Query<VersionedRecordIterator<V>> {

  private final K key;
  private final Optional<Instant> fromTime;
  private final Optional<Instant> toTime;
  private final this.isAscending = isAscending;ResultOrder order;

  }

  /**private MultiVersionedKeyQuery(
   * Creates a queryfinal thatK willkey,
 retrieve the set of records identifiedfinal by {@code key} if any exists
   * (or {@code null} otherwise).Optional<Instant> fromTime,
      final Optional<Instant> toTime,
   * @param key Thefinal keyResultOrder toorder) retrieve{
   * @throwsthis.key NullPointerException if @param key is null    = Objects.requireNonNull(key);
    this.fromTime = fromTime;
    this.toTime =  toTime;
   * @param <K> The type of the keythis.order = order;
  }     


   /**
 @param <V> The* typeCreates ofa the valuequery that will beretrieve retrieved
the set of */
records identified publicby static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key);

  /**{@code key} if any exists
   * (or {@code null} otherwise).
   *
   * <p>
   * SpecifiesWhile the startingquery timeby pointdefault forreturns the keyall query.
the record versions *of <pre>
the specified {@code * The key query returns allkey}, setting
   * the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can be records which have been inserted before the {@code fromTime} 
   * and are valid in the query{@code fromTimestamp} (by calling the {@link #fromTime(Instant)} method), and the {@code toTimestamp}
   * (by calling the {@link #toTime(Instant)} method) makes the query to return the record versions associated 
   * to the specified time range.
 (the  whole*
 time range or* even@param partially).key The specified key by the query
 in fact returns* all@param the<K> recordsThe thattype haveof NOTthe becamekey
 tombstone at or* after@param {@code fromTime}.  
   * </pre>  
   * @param fromTime The starting time point<V> The type of the value that will be retrieved
   * @throws NullPointerException if @param fromTimekey is null  
   */   

  public static <K, V> MultiVersionedKeyQuery<K, V> fromTimewithKey(Instantfinal K fromTimekey);

  /**
   * Specifies the endingstarting time point for the key query.
   * The<p>
 key query  * The key query returns all the records that haveare still timestampexisting <=in {@code toTime}.
   * @param toTime The ending time pointthe time range starting from the timestamp {@code fromTime}. There can
   * be @throwsrecords NullPointerExceptionwhich ifhave @parambeen toTimeinserted isbefore nullthe {@code 
fromTime}  */
  public MultiVersionedKeyQuery<K, V> toTime(Instant toTime);

  /**and are still valid in the query specified time range (the whole time range
   * Specifiesor theeven orderpartially). ofThe thekey returnedquery recordsin byfact thereturns queryall asthe descendingrecords bythat timestamp.
have NOT  */
  public MultiVersionedKeyQuery<K, V> withDescendingTimestamps();

  /**become tombstone at or after {@code fromTime}.  
   *
   * Specifies@param thefromTime orderThe ofstarting thetime returnedpoint
 records by the* queryIf as{@code ascendingfromTime} by timestamp.
   */
  public is null, will be considered as negative infinity, ie, no lower bound
   */
  public MultiVersionedKeyQuery<K, V> withAscendingTimestampsfromTime(final Instant fromTime);

   /**
   * TheSpecifies keythe thatending wastime specifiedpoint for the thiskey query.
   */
  public K The key();

 query /**
returns all the *records Thethat startinghave timetimestamp point of the query, if specified<= toTime.
   * 
   */
 @param publictoTime Optional<Instant> fromTime();

  /**The ending time point
   * TheIf ending@param timetoTime pointis ofnull, thewill query,be ifconsidered specified
as positive  */
  public Optional<Instant> toTime();

  /**
   * @return true if the query returns records in ascending order of timestamps
   *infinity, ie, no upper bound
   */
  public booleanMultiVersionedKeyQuery<K, isAscendingV> toTime(final Instant toTime);
}

...

Code Block
languagejava
firstline1
titleVersionedKeyValueStore.java
linenumberstrue
package org.apache.kafka.streams.state;

public interface VersionedKeyValueStore<K, V> extends StateStore {
      /**
     * Get the record associated with this key as of the specified timestamp (i.e.,
     * the existing record with the largest timestamp not exceeding the provided
     * timestamp bound).
     *
     * @param key           The key to fetch
     * @param fromTime The timestamp lower bound. The records that have been inserted at 
							or before this timestamp and did not become tombstone at or before 
							this timestamp will be retrieved and returned.
     * @param toTime The timestamp bound. This bound is inclusive; if a record
     *                      (for the specified key) exists with this timestamp, then
     *                      this is the record that will be returned.
     * @return The value and timestamp (along with the validTo timestamp) of the record associated with this key
     *         as of the provided timestamp, or {@code null} if no such record exists
     *         (including if the provided timestamp bound is older than this store's history
     *         retention time, i.e., the store no longer contains data for the provided
     *         timestamp). 
     * @throws NullPointerException       If null is used for key.
     * @throws InvalidStateStoreException if the store is not initialized
     */
    VersionedRecord<V> get(K key, long fromTime, long toTime);
}
  /**
   * Specifies the order of the returned records by the query as descending by timestamp.
   */
  public MultiVersionedKeyQuery<K, V> withDescendingTimestamps();

  /**
   * Specifies the order of the returned records by the query as ascending by timestamp.
   */
  public MultiVersionedKeyQuery<K, V> withAscendingTimestamps();

   /**
   * The key that was specified for this query.
   * The specified {@code key} of the query.
   */
  public K key();

  /**
   * The starting time point of the query, if specified
   * @return The specified {@code fromTime} of the query. 
   */
  public Optional<Instant> fromTime();

  /**
   * The ending time point of the query, if specified
   * @return The specified {@code toTime} of the query. 
   */
  public Optional<Instant> toTime();      

 /**
  * The order of the returned records by timestamp.
  * @return UNORDERED, ASCENDING, or DESCENDING if the query returns records in an unordered, ascending, or descending order of timestamps.
  */
  public ResultOrder resultOrder(); 
}


Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Imagine we have the following records 

...

Code Block
languagejava
linenumberstrue
// example 1: MultiVersionedKeyQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedKeyQuery<Integer, Integer> query1 = MultiVersionedKeyQuery.withKey(1);

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>>StateQueryRequest<VersionedRecordIterator<Integer>> request1 =
         StateQueryRequest.inStore("my_store").withQuery(query1);

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>>StateQueryResult<VersionedRecordIterator<Integer>> versionedKeyResult1 = kafkaStreams.query(request1);

// Get the results from all partitions
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>>QueryResult<VersionedRecordIterator<Integer>>> partitionResults1 = versionedKeyResult1.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>>QueryResult<VersionedRecordIterator<Integer>>> entry : partitionResults1.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>>VersionedRecordIterator<Integer> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final VersionedRecord<Integer> record = iterator.next();
          Long timestamp = record.timestamp();
          Long validTo = record.validTo();
          Integer value = record.value();	
		  System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));	
        }
     }
}  
/* the printed output will be
	value: 1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z  
	value: 2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z  
	value: 3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now  
*/

// example 2: The value of the record with key=1 from 2023-01-17 Time: 10:00:00.00Z till 2023-01-25 T10:00:00.00Z
 
MultiVersionedKeyQuery<Integer, Integer> query2 = MultiVersionedKeyQuery.withKey(1);
query2 = query2.fromTime(Instant.parse("2023-01-17T10:00:00.00Z")).toTime(Instant.parse("2023-01-25T10:00:00.00Z"))   

final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>>StateQueryRequest<VersionedRecordIterator<Integer>> request2 =
         StateQueryRequest.inStore("my_store").withQuery(query2);

final StateQueryResult<ValueIterator<VersionedRecord<Integer>>>StateQueryResult<VersionedRecordIterator<Integer>> versionedKeyResult2 = kafkaStreams.query(request2);

// Get the results from all partitions
final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>>QueryResult<VersionedRecordIterator<Integer>>> partitionResults2 = versionedKeyResult2.getPartitionResults();
for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>>QueryResult<VersionedRecordIterator<Integer>>> entry : partitionResults2.entrySet()) {
	try (final ValueIterator<VersionedRecord<Integer>>VersionedRecordIterator<Integer> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final VersionedRecord<Integer> record = iterator.next();
          Long timestamp = record.timestamp();
          Long validTo = record.validTo();
          Integer value = record.value();	
		  System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo));	
        }
     }
}   
/* the printed output will be   	
	value: 2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z  
	value: 3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now    
*/  

...