...
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA: KAFKA-15347
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
- we propose a public class MultiVersionedKeyQuery., MultiVersionedKeyQuery
- and a public enum ResultOrder
- Moreover, the public interface ValueIteratorVersionedRecordIterator is added to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp).
- In addition, a new method is added to the VersionedKeyValueStore interface to support single-key_multi-timestamp queries.
- Finally, a field called validTo is added to the VersionedRecord class to enable us representing tombstones as well.
...
To be able to list the tombstones, the validTo filed Optional field is added to the VersionedRecord class. The default value of validTo is Optional.empty() which means the record is still valid.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.kafka.streams.state; public final class VersionedRecord<V> { /** * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}. * * @param value the value * @param timestamp the timestamp * @param validTo the validTo timestamp */ public VersionedRecord(final V value, final long timestamp,) final{ long validTo); /** this.value = * Returns the {@code validTo} */ Objects.requireNonNull(value); this.timestamp = timestamp; public long this.validTo = Optional.empty(); } |
For single-key queries, MultiVersionedKeyQuery and ValueIterator classes will be used.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.kafka.streams.state; } /** * IteratorCreate interfacea ofnew {@link ValueVersionedRecord} instance. * <p> * Users must call its{@code value} cannot be {@code closenull}. method explicitly upon* completeness to release resources, * or@param usevalue try-with-resources statement (available since JDK7) forThe thisvalue {@link Closeable} class. * @param Notetimestamp that {@code remove()} is not supported. * The timestamp * @param <V>validTo Type of The values */ public interface ValueIterator<V> extends Iterator<V>, Closeable { @Override exclusive upper bound of the validity interval */ voidpublic close();VersionedRecord(final V value, final long timestamp, final Optional<Long> validTo); /** * PeekReturns the next value without advancing the iterator{@code validTo} */ @return the value that wouldpublic be returned from the next call to next */ V peek(); } |
MultiVersionedKeyQuery class
Optional<Long> validTo();
} |
For single-key queries, MultiVersionedKeyQuery and VersionedRecordIterator classes will be used.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.kafka.streams.state;
/**
* Iterator interface of {@link V}.
* <p>
* Users must call its {@code close} method explicitly upon completeness to release resources,
* or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
* Note that {@code remove()} is not supported.
*
* @param <V> Type of values
*/
public interface VersionedRecordIterator<V> extends Iterator<VersionedRecord<V>>, Closeable {
@Override
void close();
} |
ResultOrder enum
It helps with specifying the order of the returned results by the query.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.kafka.streams.query;
public enum ResultOrder {
ANY,
ASCENDING,
DESCENDING
} |
MultiVersionedKeyQuery class
- The methods are composable. The fromTime(Instant fromTime) and toTime(Instant toTime) methods specify the time range.
If a user applies the same time limit multiple times such as MultiVersionedKeyQuery.withKey(k).fromTime(t1).fromTime(t2), then the last one wins (it will be translated to MultiVersionedKeyQuery.withKey(k).fromTime(t2)).
- Defining a query with time range (empty, t1] will be translated into [0, t1] (calling only the toTime(t1) method).
- Defining a query with time range [t1, empty) will be translated into [t1, MAX) (calling only the fromTime(t1) method).
- A query with no specified time range will be translated into [0, MAX). It means that the query will return all the versions of the
If a user applies the same time limit multiple times such as MultiVersionedKeyQuery.withKey(k).from(t1).from(t2), then the last one wins (it will be translated to MultiVersionedKeyQuery.withKey(k).from(t2)).
- Defining a query with time range (empty, t1] will be translated into [0, t1] (calling only the from method).
- Defining a query with time range (t1, empty) will be translated into [t1, MAX) (calling only the asOf method).
- A query with no specified time range will will be translated into [0, MAX). It mean the query will return all the versions of the records with specified key.
- As explained in the javadocs, the query returns all valid records within the specified time range.
- The fromTimestamp fromTime specifies the starting point. There can be records which have been inserted before the fromTimestamp fromTime and are valid in the time range. For example, if the record (k,v) has been inserted at time=0, it will be returned by the multi versioned key queries with key=k and fromTimestamp>fromTime>=0. Obviously, if the record (k,v) becomes tombstone at time=2, then the multi versioned key queries with key=k and fromTimestamp>fromTime>=2 will not return it any more. In this case, the multi versioned key queries with key=k and fromTimestamp<2 fromTime<2 will return the record (k,v) validTo=2.
- The asOfTimestamp toTime specifies the ending point. Records that have been inserted at asOfTimestamp toTime are returned by the query as well.
- The order of the returned records is by default ascending by timestamp. The method withDescendingTimestamps() can reverse the order. Btw, withAscendingTimestamps() method can be used for code readability purposeNo ordering is guaranteed for the results, but the results can be sorted by timestamp (in ascending or descending order) by calling the corresponding defined methods (withAscendingTimestamps() and withDescendingTimestamps() respectively).
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range. */ @Evolving public final class MultiVersionedKeyQuery<K, V> implements Query<ValueIterator<VersionedRecord<V>>> { private final K key; private final Optional<Instant> fromTimestamp; private final Optional<Instant> asOfTimestamp; private final boolean isAscending; private MultiVersionedKeyQuery( final K key, Optional<Instant> fromTimestamp, Optional<Instant> asOfTimestamp, boolean isAscending) { this.key = Objects.requireNonNull(key); this.fromTimestamp = fromTimestamp; this.asOfTimestamp = asOfTimestamp; this.isAscending = isAscending; } /** * Creates a query that will retrieve the set of records identified by {@code key} if any exists * (or {@code null} otherwise). * @param key The key to retrieve * @throws NullPointerException if @param key is null * @param <K> The type of the key * @param <V> The type of the value that will be retrieved */ public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key); /** * Specifies the starting time point for the key query. * <pre> * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTimestamp}. There can be records which have been inserted before the {@code fromTimestamp} * and are valid in the query specified time range (the whole time range or even partially). The key query in fact returns all the records that have NOT became tombstone at or after {@code fromTimestamp}. * </pre> No ordering is guaranteed for the results, but the results can be sorted by timestamp (in ascending or descending order) by calling the corresponding defined methods. * * @param <K> The type of the key. * @param <V> The type of the result returned by this query. */ @Evolving public final class MultiVersionedKeyQuery<K, V> implements Query<VersionedRecordIterator<V>> { private final K key; private final Optional<Instant> fromTime; private final Optional<Instant> toTime; private final ResultOrder order; private MultiVersionedKeyQuery( final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final ResultOrder order) { this.key = Objects.requireNonNull(key); this.fromTime = fromTime; this.toTime = toTime; this.order = order; } /** * Creates a query that will retrieve the set of records identified by {@code key} if any exists * (or {@code null} otherwise). * * <p> * While the query by default returns the all the record versions of the specified {@code key}, setting * the {@code fromTimestamp} (by calling the {@link #fromTime(Instant)} method), and the {@code toTimestamp} * (by calling the {@link #toTime(Instant)} method) makes the query to return the record versions associated * to the specified time range. * * @param key The specified key by the query * @param <K> The type of the key * @param fromTimestamp<V> The startingtype time pointof the value that will be retrieved * @throws NullPointerException if @param fromTimestampkey is null */ public static <K, V> MultiVersionedKeyQuery<K, V> fromwithKey(Instantfinal K fromTimestampkey); /** * Specifies the endingstarting time point for the key query. * <p> * The key query returns all the records that haveare timestampstill <=existing {@code asOfTimestamp}. * @param asOfTimestamp The ending time pointin the time range starting from the timestamp {@code fromTime}. There can * @throwsbe NullPointerExceptionrecords ifwhich @paramhave asOfTimestampbeen isinserted nullbefore the {@code fromTime} and */ are still publicvalid MultiVersionedKeyQuery<K,in V>the asOf(Instant asOfTimestamp); /**query specified time range (the whole time range * Specifies the order of the returned records by the query as descending by timestamp. or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}. */ public MultiVersionedKeyQuery<K, V> withDescendingTimestamps(); /** * @param fromTime The starting time point * TheIf key{@code thatfromTime} wasis specifiednull, forwill thisbe query. considered as negative */ public K key(); /**infinity, ie, no lower bound */ * The starting time point of the query, if specified */ public Optional<Instant> fromTimestamp(public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime); /** * Specifies Thethe ending time point offor the key query, if specified. */ The publickey query Optional<Instant> asOfTimestamp(); /**returns all the records that have timestamp <= toTime. * * @return@param truetoTime ifThe theending querytime returnspoint records in ascending* orderIf of@param timestamps toTime is null, */ will be publicconsidered booleanas isAscending (); } |
...
positive infinity, ie, no upper bound
*/
public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime);
/**
* Specifies the order of the returned records by the query as descending by timestamp.
*/
public MultiVersionedKeyQuery<K, V> withDescendingTimestamps();
/**
* Specifies the order of the returned records by the query as ascending by timestamp.
*/
public MultiVersionedKeyQuery<K, V> withAscendingTimestamps();
/**
* The key that was specified for this query.
* The specified {@code key} of the query.
*/
public K key();
/**
* The starting time point of the query, if specified
* @return The specified {@code fromTime} of the query.
*/
public Optional<Instant> fromTime();
/**
* The ending time point of the query, if specified
* @return The specified {@code toTime} of the query.
*/
public Optional<Instant> toTime();
/**
* The order of the returned records by timestamp.
* @return UNORDERED, ASCENDING, or DESCENDING if the query returns records in an unordered, ascending, or descending order of timestamps.
*/
public ResultOrder resultOrder();
}
|
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Imagine we have the following records
put(1, 1, time=2023-01-01T10:00:00.00Z)
put(1, null, time=2023-01-05T10:00:00.00Z)
put(1, null, time=2023-01-10T10:00:00.00Z)
put(1, 2, time=2023-01-15T10:00:00.00Z)
put(1, 3, time=2023-01-20T10:00:00.00Z)
Code Block | ||||
---|---|---|---|---|
| ||||
// example 1: MultiVersionedKeyQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedKeyQuery<Integer, Integer> query1 = MultiVersionedKeyQuery.withKey(1);
final StateQueryRequest<VersionedRecordIterator<Integer>> request1 = StateQueryRequest.inStore("my_store").withQuery(query1);
final StateQueryResult<VersionedRecordIterator<Integer>> versionedKeyResult1 = kafkaStreams.query(request1);
// Get the results from all partitions
final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults1 = versionedKeyResult1.getPartitionResults();
for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> entry : partitionResults1.entrySet()) {
try (final VersionedRecordIterator<Integer> iterator = entry.getValue().getResult()) {
while (iterator.hasNext()) {
final VersionedRecord<Integer> record = iterator.next();
Long timestamp = record.timestamp();
|
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.kafka.streams.state; /** * Get the record associated with this key as of the specified timestamp (i.e., * the existing record with the largest timestamp not exceeding the provided * timestamp bound). * * @param key The key to fetch * @param fromTimestamp * @param asOfTimestamp The timestamp bound. This bound is inclusive; if a record * (for the specified key) exists with this timestamp, then * this is the record that will be returned. * @return The value and timestamp of the record associated with this key * as of the provided timestamp, or {@code null} if no such record exists * (including if the provided timestamp bound is older than this store's history * Long retentionvalidTo time,= i.e., the store no longer contains data for the provided * timestamp). Note that the record timestamp {@code r.timestamp()} of the * record.validTo(); Integer value = record.value(); System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); returned {@link VersionedRecord} may be smaller than the} } provided timestamp /* the printed output will * bound. Additionally, if the latest record version for the key is eligible * for the provided timestamp bound, then that record will be returned even if * the timestamp bound is older than the store's history retention. * @throws NullPointerException If null is used for key. * @throws InvalidStateStoreException if the store is not initialized */ VersionedRecord<V> get(K key, long fromTimestamp, long asOfTimestamp); |
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Code Block | ||||
---|---|---|---|---|
| ||||
final MultiVersionedKeyQuery<Integer, Integer> query = MultiVersionedKeyQuery.withKey(1).allVersions(); final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request = inStore("my_store").withQuery(query); final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult = kafkaStreams.query(request); // Get the results from all partitions. final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults = versionedKeyResult.getPartitionResults(); for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) {be value: 1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z value: 2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z value: 3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now */ // example 2: The value of the record with key=1 from 2023-01-17 Time: 10:00:00.00Z till 2023-01-25 T10:00:00.00Z MultiVersionedKeyQuery<Integer, Integer> query2 = MultiVersionedKeyQuery.withKey(1); query2 = query2.fromTime(Instant.parse("2023-01-17T10:00:00.00Z")).toTime(Instant.parse("2023-01-25T10:00:00.00Z")) final StateQueryRequest<VersionedRecordIterator<Integer>> request2 = StateQueryRequest.inStore("my_store").withQuery(query2); final StateQueryResult<VersionedRecordIterator<Integer>> versionedKeyResult2 = kafkaStreams.query(request2); // Get the results from all partitions final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults2 = versionedKeyResult2.getPartitionResults(); for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> entry : partitionResults2.entrySet()) { try (final VersionedRecordIterator<Integer> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final VersionedRecord<Integer> record = iterator.next(); Long timestamp = record.timestamp(); Long validTo = record.validTo(); Integer value = record.value(); System.out.println ("value: " + value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } final VersionedRecord<Integer> record = iterator.next(); } } /* the printed output will be Long timestamp = record.timestamp(); Integer value = record.value(); } } } value: 2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z value: 3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now */ |
Compatibility, Deprecation, and Migration Plan
- Since this is a completely new set of APIs, no backward compatibility concerns are anticipated.
- Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.nothing is deprecated in this KIP, users have no need to migrate unless they want to.
Rejected Alternatives
In order to be able to retrieve the consecutive tombstones, we can have a method or flag (disabled by default) to allow users to get all tombstones. If it is a real use case for the users, we will add it later.
Test Plan
The single-key_multi-timestamp interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned key queries). Moreover , there will be unit tests where ever needed.