Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. single-key latest-value lookup
  2. single-key lookup with asOf timestamp (upper) bound


Proposed Changes

In this KIP we propose a new Optional field in VersionedKeyQuery to introduce the class VersionedKeyQuery with an Optional field to store the asOfTimestamp value. Moreover, the The method asOf is added to the class in order to create   creates key queries having asOfTimestamp value as well.  
Defining the latest() method is not needed since returning the latest value has been always the default assumption.

Code Block
languagejava
firstline1
titleVersionedKeyQuery.java
package org.apache.kafka.streams.query;

/**
 * Interactive query for retrieving a single record  from a versioned state store based on its key and timestamp.
 * <p>
 * See KIP-960 for more details.
 */
@Evolving
public final class VersionedKeyQuery<K, V> implements Query<VersionedRecord<V>> {

    private final K key;
    private final Optional<Instant> asOfTimestamp;

    private VersionedKeyQuery(final K key, final Optional<Instant> asOfTimestamp) {
        this.key = Objects.requireNonNull(key);
        this.asOfTimestamp = asOfTimestamp;
    }

    /**
     * Creates a query that will retrieve the record from a versioned state store identified by {@code key} if it exists
     * (or {@code null} otherwise).
     * @param key The key to retrieve
     * @throws NullPointerException if @param key is null            
     * @param <K> The type of the key
     * @param <V> The type of the value that will be retrieved
     */
    public static <K, V> VersionedKeyQuery<K, V> withKey(final K key);

    /**
     * Specifies the upperas inclusiveof boundtimestamp for the key query. The key query returns the record
     * with the greatest timestamp <= asOfTimestamp
     * @param asOfTimestamp The upperas inclusiveof boundtimestamp for timestamp
     * @throws NullPointerException if @param asOfTimestamp is null            
     */
    public VersionedKeyQuery<K, V> asOf(final Instant asOfTimestamp);

    /**
     * The key that was specified for this query.
     */
    public K getKey();

    /**
     * The timestamp of the query, if specified
     */
    public Optional<Instant> getAsOfTimestamp();
}  

...