Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread

JIRA: KAFKA-15257 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The main goal is supporting interactive queries in presence of versioned state stores (KIP-889) in AK. For this KIP, the following query types are considered to be implemented.  

Range Queries

  1. key-range latest-value query
  2. key-range with lower bound latest-value query
  3. key-range with upper bound latest-value query
  4. all-keys (no bound) latest-value query
  5. key-range query with timestamp (upper) bound
  6. key-range with lower bound with timestamp (upper) bound 
  7. key-range with upper bound with timestamp (upper) bound
  8. all-keys (no bound) with timestamp (upper) bound
  9. key-range query with timestamp range
  10. key-range query with lower bound with timestamp range
  11. key-range query with upper bound with timestamp range
  12. all-keys (no bound) with timestamp range
  13. key-range query all-versions
  14. key-range query with lower bound all-versions
  15. key-range query with upper bond all-versions
  16. all-keys query (no bound) all-versions (entire store)

Public Interfaces

In this KIP we propose two new public classes: VersionedKeyQuery and VersionedRangeQuery that will be described in the next section. Moreover, the public interface ValueIterator is used to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp). 

Proposed Changes

For supporting range queries, VersionedRangeQuery class is used.


Examples

The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.

Code Block
languagejava
linenumberstrue
final VersionedRangeQuery<Integer, Integer> query =
        VersionedRangeQuery.keyRangeWithTimestampRange(1, 2, 1690201149, 1690373949);
 
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request =
        inStore("my_store").withQuery(query);
 
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request);
 
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
        while (iterator.hasNext()) {
          final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next();
          Integer key = record.key;
          Integer value = record.value.value();
          Long timestamp = record.value.timestamp();
        }
     }
}


Compatibility, Deprecation, and Migration Plan

  • Since this is a completely new set of APIs, no backward compatibility concerns are anticipated. 
  • Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.

...