...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
KIP Tracking
To control the scope of this long-term project, we chose to defer much of the details to later KIPs. Here's a roundup of the related work:
- KIP-805: Add range and scan query support in IQ v2
- KIP-806: Add session and window query over kv-store in IQv2
Motivation
Kafka Streams supports an interesting and innovative API for "peeking" into the internal state of running stateful stream processors from outside of the application, called Interactive Query (IQ). This functionality has proven invaluable to users over the years for everything from debugging running applications to serving low latency queries straight from the Streams runtime.
...
Code Block |
---|
public interface StateStore { ... /** * Execute a query. Returns a QueryResult containing either result data or * a failure. * <p> * If the store doesn't know how to handle the given query, the result * will be a {@link FailureReason#UNKNOWN_QUERY_TYPE}. * If the store couldn't satisfy the given position bound, the result * will be a {@link FailureReason#NOT_UP_TO_BOUND}. * @param query The query to execute * @param offsetBound The position the store must be at or past * @param collectExecutionInfo Whether the store should collect detailed execution info for the query * @param <R> The result type */ @Evolving default <R> QueryResult<R> query(Query<R> query, PositionBound positionBound, boolean collectExecutionInfo) { // If a store doesn't implement a query handler, then all queries are unknown. return QueryResult.forUnknownQueryType(query, this); } /** * Returns the position the state ... } |
Query
This is the interface that all queries must implement. Part of what I'm going for here is to place as few restrictions as possible on what a "query" or its "result" is, so the only thing in this interface is a generic type indicating the result type, which lets Streams return a fully typed response without predetermining the response type itself.
Code Block |
---|
@Evolving
public interface Query<R> { } |
Using this interface, store implementers can create any query they can think of and return any type they like, from a single value to an iterator, even a future. While writing the POC, I implemented three queries for KeyValue stores, which I'll include here as examples:
Proposed Query: KeyQuery
This query implements the functionality of the current KeyValueStore#get(key)
method:
store is at with respect to the input topic/partitions
*/
@Evolving
default Position getPosition() {
throw new UnsupportedOperationException(
"getPosition is not implemented by this StateStore (" + getClass() + ")"
);
}
...
} |
Query
This is the interface that all queries must implement. Part of what I'm going for here is to place as few restrictions as possible on what a "query" or its "result" is, so the only thing in this interface is a generic type indicating the result type, which lets Streams return a fully typed response without predetermining the response type itself.
Code Block |
---|
@Evolving
public interface Query<R> { } |
Using this interface, store implementers can create any query they can think of and return any type they like, from a single value to an iterator, even a future. While writing the POC, I implemented three queries for KeyValue stores, which I'll include here as examples:
Proposed Query: KeyQuery
This query implements the functionality of the current KeyValueStore#get(key)
method:
Code Block |
---|
@Evolving
public class KeyQuery<K, V> implements Query<V> {
// static factory to create a new KeyQuery, given a key
public static <K, V> KeyQuery<K, V> withKey(final K key);
// getter for the key
public K getKey();
}
// === |
Code Block |
@Evolving public class KeyQuery<K, V> implements Query<V> { // static factory to create a new KeyQuery, given a key public static <K, V> KeyQuery<K, V> withKey(final K key); // getter for the key public K getKey(); } // ====================================== // example usage in IQv2: Integer key = 1; // note that "mystore" is a KeyValueStore<Integer, ValueAndTimestamp<Integer>>, // hence the result type StateQueryRequest<ValueAndTimestamp<Integer>> query = inStore("mystore") .withQuery(KeyQuery.withKey(key)); // run the query StateQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query); // In this example, it doesn't matter which partition ran the query, so we just // grab the result from the partition that returned something // (this is what IQ currently does under the covers). Integer value = result.getOnlyPartitionResult().getResult().value(); // ====================================== // Forexample comparison,usage here is how the query would look in current IQ:in IQv2: Integer key = 1; // (note, thisthat glosses"mystore" overis manya of the problems described above.KeyValueStore<Integer, ValueAndTimestamp<Integer>>, // hence the exampleresult istype here to illustrate the different ergonomics // of StateQueryRequest<ValueAndTimestamp<Integer>> the two APIs.) // create the query parameters StoreQueryParameters<ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>>> storeQueryParameters = StoreQueryParameters.fromNameAndType(query = inStore("mystore") .withQuery(KeyQuery.withKey(key)); "mystore", // run the query StateQueryResult<ValueAndTimestamp<Integer>> result QueryableStoreTypes= kafkaStreams.timestampedKeyValueStorequery(query) ); // get a store handle to run In this example, it doesn't matter which partition ran the query ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>>so store = kafkaStreams.store(storeQueryParameters); we just // querygrab the store Integer value1 = store.get(keyresult from the partition that returned something // (this is what IQ currently does under the covers). Integer value = result.getOnlyPartitionResult().getResult().value(); |
We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.
Example Query: RawKeyQuery
We could later propose to add a "raw" version of the KeyQuery, which simply takes the key as a byte array and returns the value as a byte array. This could allow callers to bypass the serialization logic in the Metered stores entirely, potentially saving valuable CPU time and memory.
Code Block |
---|
@Evolving
public class RawKeyQuery implements Query<byte[]> {
public static RawKeyQuery withKey(final Bytes key);
public static RawKeyQuery withKey(final byte[] key);
public Bytes getKey();
} |
Example query: RawScanQuery
This example demonstrates two variations on the first example:
- The ability to define queries handling "raw" binary data for keys and values
- The ability for a query to return multiple results (in this case, an iterator)
I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.
Note this query is purely an example of what is possible. It will not be added as part of this KIP.
// ======================================
// For comparison, here is how the query would look in current IQ:
// (note, this glosses over many of the problems described above.
// the example is here to illustrate the different ergonomics
// of the two APIs.)
// create the query parameters
StoreQueryParameters<ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>>> storeQueryParameters =
StoreQueryParameters.fromNameAndType(
"mystore",
QueryableStoreTypes.timestampedKeyValueStore()
);
// get a store handle to run the query
ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>> store =
kafkaStreams.store(storeQueryParameters);
// query the store
Integer value1 = store.get(key).value(); |
We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.
Example Query: RawKeyQuery
We could later propose to add a "raw" version of the KeyQuery, which simply takes the key as a byte array and returns the value as a byte array. This could allow callers to bypass the serialization logic in the Metered stores entirely, potentially saving valuable CPU time and memory.
Code Block |
---|
@Evolving
public class RawKeyQuery implements Query<byte[]> {
public static RawKeyQuery withKey(final Bytes key);
public static RawKeyQuery withKey(final byte[] key);
public Bytes getKey();
} |
Example query: RawScanQuery
This example demonstrates two variations on the first example:
- The ability to define queries handling "raw" binary data for keys and values
- The ability for a query to return multiple results (in this case, an iterator)
I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.
Note this query is purely an example of what is possible. It will not be added as part of this KIP.
Code Block |
---|
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
private RawScanQuery() { }
public static RawScanQuery scan() {
return new RawScanQuery();
}
}
// example usage
// since we're handling raw data, we're going to need the serdes
// this is just an example, this method is also not proposed in this KIP.
StateQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
kafkaStreams.serdesForStore("mystore");
// run the "scan" query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));
// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
scanResult.getPartitionResults();
// for this example, we'll just collate all the partitions' iterators
// together and print their data
List<KeyValueIterator<Bytes, byte[]>> iterators =
partitionResults
.values()
.stream()
.map(QueryResult::getResult)
.collect(Collectors.toList());
// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
while(collate.hasNext()) {
KeyValue<Bytes, byte[]> next = collated.next();
System.out.println(
"|||" +
" " + |
Code Block |
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
private RawScanQuery() { }
public static RawScanQuery scan() {
return new RawScanQuery();
}
}
// example usage
// since we're handling raw data, we're going to need the serdes
// this is just an example, this method is also not proposed in this KIP.
StateQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
kafkaStreams.serdesForStore("mystore");
// run the "scan" query
StateQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));
// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
scanResult.getPartitionResults();
// for this example, we'll just collate all the partitions' iterators
// together and print their data
List<KeyValueIterator<Bytes, byte[]>> iterators =
partitionResults
.values()
.stream()
.map(QueryResult::getResult)
.collect(Collectors.toList());
// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
while(collate.hasNext()) {
KeyValue<Bytes, byte[]> next = collated.next();
System.out.println(
"|||" +
" " + serdes.keyFrom(next.key.get()) +
" " + serdes.valueFrom(next.value)
);
}
} |
...
EDIT: The KIP initially proposed to have a "latest" bound, which would require the query to run on a RUNNING Active task, but that has been moved to StateQueryRequest#requireActive. This more cleanly separates responsibilities, since StateStores are responsible for enforcing the PositionBound and Kafka Streams is responsible for handling the other details of StateQueryRequest. The problem was that the store actually doesn't have visibility into the state or type of task that contains it, so it was unable to take responsibility for the "latest" bound. It's important in particular to keep the StateStore's responsibilities clear because that is an extension point for Kafka Streams users who implement their own stores.
Code Block |
---|
@Evolving public class PositionBound { // Create a new Position bound representing "no bound" static PositionBound unbounded(); // Create a new, empty Position static PositionBound at(Position position); // Check whether this is an "unbounded" Position bound boolean isUnbounded(); // Get the Position (if it's not unbounded or latest) Position position(); public class PositionBound { // Create a new Position bound representing "no bound" static PositionBound unbounded(); // Create a new, empty Position static PositionBound at(Position position); // Check whether this is an "unbounded" Position bound boolean isUnbounded(); // Get the Position (if it's not unbounded or latest) Position position(); } |
StateStoreContext
To support persistence of position information across restarts (for persistent state stores, which won't read the changelog), we need to add a mechanism for the store to be notified when Streams is in a consistent state (after commit and before processing). This operation is purely a contract between the persistent, inner state store and the {Global,Processor}StateManager
. No other components need to invoke that operation, and no components need to invoke it on the so registering a callback as part of register
is better than
Code Block |
---|
interface StateStoreContext {
...
// UNCHANGED EXISTING METHOD FOR REFERENCE:
void register(final StateStore store,
final StateRestoreCallback stateRestoreCallback)
// NEW METHOD:
/**
* Registers and possibly restores the specified storage engine.
*
* @param store the storage engine
* @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
* @param commitCallback a callback to be invoked upon successful task commit, in case the store
* needs to perform any state tracking when the task is known to be in
* a consistent state. If the store has no such state to track, it may
* use {@link StateStoreContext#register(StateStore, StateRestoreCallback)} instead.
* Persistent stores provided by Kafka Streams use this method to save
* their Position information to local disk, for example.
*
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
@Evolving
void register(final StateStore store,
final StateRestoreCallback stateRestoreCallback,
final CommitCallback commitCallback);
...
} |
Compatibility, Deprecation, and Migration Plan
...