Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Using this interface, store implementers can create any query they can think of and return any type they like, from a single value to an iterator, even a future. While writing the POC, I implemented three queries for KeyValue stores, which I'll include here as examples:

...

Proposed Query: KeyQuery

This query implements the functionality of the current KeyValueStore#get(key)  method:

...

We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.

...

Proposed Query:

...

RawKeyQuery

We also propose to add a "raw" version of the KeyQuery, which simply takes the key as a byte array and returns the value as a byte array. This will be used to implement the KeyQuery, since in Streams the Metered store layers would "terminate" the typed KeyQuery by invoking the serializer on the key and then to execute a RawKeyQuery on the lower layers with the serialized key. Those lower layers would return a serialized value, which the Metered store would deserialize and (using QueryResult#swapValue ) convert it to a typed vale to return for the KeyQuery's result.

Having a separate query type is nice for StateStore implementations is nice because they would otherwise have to assume that if they receive a KeyQuery<K,V>  it would always in practice be a KeyQuery<Bytes,byte[]> or similar. If we wanted to migrate (eg) from Bytes  to byte[]  or ByteBuffer  for the key, there wouldn't really be a good way to do it with generics, but with a defined type, we could add and deprecate methods to get a migration path.

The same RawKeyQuery is well defined for IQv2 callers to execute, which allows them to bypass the serialization logic in the Metered stores entirely, potentially saving valuable CPU time and memory.

Code Block
@Evolving
public class RawKeyQuery implements Query<byte[]> {
  public static RawKeyQuery withKey(final Bytes key);
  public static RawKeyQuery withKey(final byte[] key);
  public Bytes getKey();
}


Example query: RawScanQuery

This example demonstrates two variations on the first example:

  1. The ability to define queries handling "raw" binary data for keys and values
  2. The ability for a query to return multiple results (in this case, an iterator)

I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.

Note this query is purely an example of what is possible. It will not be added as part of this KIP.

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();
    }
}

// example usage

// since we're handling raw data, we're going to need the serdes
InteractiveQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
  kafkaStreams.serdesForStore("mystore");

// run the "scan" query
InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
  kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()

This example demonstrates two variations on the first example:

  1. The ability to define queries handling "raw" binary data for keys and values
  2. The ability for a query to return multiple results (in this case, an iterator)

I'm only bundling those for brevity. We can also have typed, iterable queries and raw single-record queries.

Note this query is purely an example of what is possible. It will not be added as part of this KIP.

Code Block
public class RawScanQuery implements Query<KeyValueIterator<Bytes, byte[]>> {
    private RawScanQuery() { }

    public static RawScanQuery scan() {
        return new RawScanQuery();
    }
}

// example usage

// since we're handling raw data, we're going to need the serdes
InteractiveQuerySerdes<Integer, ValueAndTimestamp<Integer>> serdes =
  kafkaStreams.serdesForStore("mystore");

// run the "scan" query
InteractiveQueryResult<KeyValueIterator<Bytes, byte[]>> scanResult =
  kafkaStreams.query(inStore("mystore").withQuery(RawScanQuery.scan()));

// This time, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>> partitionResults =
  scanResult.getPartitionResults();


// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>> iterators =
  partitionResults
    .values()
    .stream()
    .map(QueryResult::getResult)
    .collect(Collectors.toList());

// UsingThis an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytestime, we'll get results from all locally available partitions.
Map<Integer, QueryResult<KeyValueIterator<Bytes, byte[]>>>>> collatedpartitionResults =
 Iterators scanResult.collategetPartitionResults(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes);


// for this example, we'll just collate all the partitions' iterators
// together and print their data

List<KeyValueIterator<Bytes, byte[]>>> nextiterators = collated.next
  partitionResults
    .values();
    System.out.printlnstream()
    .map(QueryResult::getResult)
  "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)
    )  .collect(Collectors.toList());

// Using an example convenience method we could add to collate iterators' data
try (CloseableIterator<KeyValue<Bytes, byte[]>> collated = Iterators.collate(collect)) {
  while(collate.hasNext()) {
    KeyValue<Bytes, byte[]> next = collated.next();
    System.out.println(
      "|||" +
        " " + serdes.keyFrom(next.key.get()) +
        " " + serdes.valueFrom(next.value)
    );
  }
}

QueryResult

This is a container for a single partition's query result.

...

Since the scope of this KIP is so large, we plan a short period of interface instability (during which all the IQv2 APIs are marked @Evolving ) in case we discover a need to change this proposal. During this time, we will have the option to break compatibility in major (eg 4.0) or minor (eg 3.1), but not bugfix (eg 3.0.1) releases. We hope not to make use of this option, though. We plan to drop these annotations (in a later KIP) once IQv2 is complete enough to have confidence in the current design.

Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.

Rejected Alternatives

Incrementally improve the current IQ API

nothing is deprecated in this KIP, users have no need to migrate unless they want to.

Rejected Alternatives

Incrementally improve the current IQ API

Some of the motivating problems could be addressed within the scope of the current IQ API, but many are inherent in the current API's design. The full set of problems convinced us that it's time for a new API.

Defer the Position aspects to a later KIP

This would certainly simplify this KIP, but it would also mean we need to update the StateStore interface twice. If the Position proposal generates too much controversy, we will split it into a separate KIP.

Defer the Position functionality to individual StateStore/Query implementations

The IQv2 design leaves the door open for Queries and StateStores to coordinate to provide all kinds of functionality on top of "just" returning responses to queries. For example, new Queries and StateStores can be implemented to return asynchronous Future<R>  results. So perhaps it is also a good idea to push the idea of bounding the Position into the "user space" of Query/StateStore implementations as well. We think that it could be done, but there are also advantages to making it a framework feature. The biggest one is simply uniformity. The framework Position tracking and bounding can ensure a consistent API across all query and store implementations, whereas pushing it to the implementations would make it merely conventional, raising future interoperability concerns. Position data also needs to be transmitted to standby replicas and made fault-tolerant, all of which becomes more complex in "user space" than in the framework.

Improve Query dispatch performance.

This was inspired by the fact that we don't have a great mechanism to dispatch Queries to their execution methods inside the state store. Presently, we have to either have a sequence of if/else type checks or a switch statement in the StateStore#execute  implementation, or some other approach like a HashMap of Query → method. Java 17 has the ability to switch over the class itself (see https://openjdk.java.net/jeps/406), but that's obviously a long way off for Apache Kafka.

Host the execution logic in the Query instead of the StateStore

Since the number of Query implementations will probably be much more than the number of StateStore implementations, we can move the type-checks from StateStore to Query by replacing StateStore#execute(Query)  with Query#executeOn(StateStore) . Then, instead of having StateStore implementations define how to behave for each query, we would have the Queries define how to run on each StateStore.

The reason we didn't pursue this idea is that it makes it difficult for new (especially user-defined) StateStore implementations to re-use existing query types. For example, the KeyQuery/RawKeyQuery we propose to add in this KIP is very general, and presumably lots of custom stores would want to support it, but they would be unable to if doing so requires modification to the Query's implementation itselfSome of the motivating problems could be addressed within the scope of the current IQ API, but many are inherent in the current API's design. The full set of problems convinced us that it's time for a new API.