Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: "Under Discussion"Adopted

Discussion thread: here 

JIRA: KAFKA-13494

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Provide an implementation of the Query interface, introduced in KIP-796: Interactive Query v2 , to support session and window queries.

Public Interfaces

In this KIP we propose two new public classes:

WindowKeyQuery

...

This type used to search occurrences of keys in window stores in combination with IQv2

...

. In particular the following mappings are established:

  • WindowStore.fetch(K key, Instant timeFrom, Instant timeTo) ← WindowKeyQuery.withKeyAndWindowStartRange(key, from, to)

WindowRangeQuery

The query type is used to search per window aggregates of keys in window and session stores in combination with IQv2. In particular the following mappings are established:

  • WindowStore.fetchAll(Instant timeFrom, Instant timeTo) ← WindowRangeQuery.withWindowStartRange(from, to)
  • SessionStore.fetch(Bytes key) ← WindowRangeQuery.withKey(key)

There are multiple discussion points here:

  • One reason we cannot use WindowKeyQuery to support SessionStore.fetch(key) is because SessionStore.fetch(key) returns a windowed key value iterator, KeyValueIterator<Windowed<Bytes>, byte[]>, whereas WindowKeyQuery is bound to Query<WindowStoreIterator<V>>. 
  • WindowRangeQuery does not currently support key ranges. We could add more cases here to get better coverage of WindowStore and SessionStore API. In this KIP, however, we focus on a subset of queries and defer support for additional cases to future KIPs.

Proposed Changes

The WindowKeyQuery class:

Code Block
@Evolving
public class WindowKeyQuery<K, V> implements Query<V> {

    private final K key;
    private final Optional<Long> windowLower;

    private WindowKeyQuery(final K key, final Optional<Long> windowLower) {
        this.key = key;
        this.windowLower = windowLower;
    }

Query<WindowStoreIterator<V>> {

    public static <K, V> WindowKeyQuery<K, V> withKeywithKeyAndWindowStartRange(final K key), {
final Instant timeFrom, final     return new WindowKeyQuery<>(key, Optional.empty());
    }Instant timeTo);

    public static <K, V> WindowKeyQuery<K, V> withKeyAndWindowLowerBound(final K key, final long time) {
        return new WindowKeyQuery<>(key, Optional.of(time));
    }

K getKey();

    public KOptional<Instant> getKeygetTimeFrom() {
        return key;
    }

    public Optional<Long>Optional<Instant> getWindowLowergetTimeTo() {
        return windowLower;
    }
}

The WindowRangeQuery class:

Code Block
@Evolving
public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<KQuery<KeyValueIterator<Windowed<K>, V>> {

    private final Optional<K> lower;
    private final Optional<K> upper;
    private final Optional<Long> windowLower;
    private final Optional<Long> windowUpper;

    private WindowRangeQuery(final Optional<K> lower, final Optional<K> upper, final Optional<Long> windowLower, final Optional<Long> windowUpper) {
        this.lower = lower;
        this.upper = upper;
        this.windowLower = windowLower;
        this.windowUpper = windowUpper;
    }

    public static <K, V> WindowRangeQuery<K, V> withKeyRangewithKey(final K lower, final K upper, final Optional<Long> windowLower, final Optional<Long> windowUpper) {
        return new WindowRangeQuery<>(Optional.of(lower), Optional.of(upper), windowLower, windowUpper);
    }key);          

    public static <K, V> WindowRangeQuery<K, V> withKeyUpperBoundwithWindowStartRange(final KInstant uppertimeFrom, final Optional<Long> windowLower, final Optional<Long> windowUpper) {
        return new WindowRangeQuery<>(Optional.empty(), Optional.of(upper), windowLower, windowUpper);
    }Instant timeTo);

    public static <K, V> WindowRangeQuery<K, V> withKeyLowerBound(final K lower, final Optional<Long> windowLower, final Optional<Long> windowUpper) {
        return new WindowRangeQuery<>(Optional.of(lower), Optional.empty(), windowLower, windowUpper);
    }

    public static <K, V> WindowRangeQuery<K, V> withNoKeyBounds(final Optional<Long> windowLower, final Optional<Long> windowUpper) {
        return new WindowRangeQuery<>(Optional.empty(), Optional.empty(), windowLower, windowUpper);
    }

K getKey();

    public Optional<Instant> getTimeFrom();

    public Optional<K>Optional<Instant> getKeyLowerBoundgetTimeTo() {
        return lower;
    }

    public Optional<K> getKeyUpperBound() {
        return upper;
    }

    public Optional<Long> getWindowLowerBound() {
        return windowLower;
    }

    public Optional<Long> getWindowUpperBound() {
        return windowUpper;
    }
}

Examples

The following example illustrates the use of the WindowKeyQuey class.

Code Block
Integerfinal Instant keytimeTo = 1Instant.now();
longfinal Instant lowerBoundtimeFrom = beginningOfDaytimeTo.minusSeconds(60);
 
StateQueryRequest<byte[]>final WindowKeyQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = WindowKeyQuery.withKeyAndWindowStartRange(key, timeFrom, timeTo);

final StateQueryRequest<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> request = inStore("rocksdbwindowstorerocksdb-window-store")
  .withQuery(WindowKeyQuery.withKey(key, lowerBound)query);
final StateQueryResult<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> result     
// run the query
StateQueryResult<byte[]> result = kafkaStreams.query(query); = kafkaStream.query(request);
final WindowStoreIterator<ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult();

The following example illustrates the use of the WindowQuery class to query a window store.

Code Block
Integerfinal key1Instant = 1;
Integer key2 = 2;
long lowerBound = beginningOfDaytimeTo = Instant.now();
longfinal Instant upperBoundtimeFrom = nowtimeTo.minusSeconds(60);
 
StateQueryRequest<KeyValueIterator<Windowed<Bytes>, byte[]>>, byte[]>final WindowRangeQuery<GenericKey, ValueAndTimestamp<GenericRow>> query =
  inStore("rocksdbsessionstore")
  .withQuery(WindowQuery.withRange(key1, key2, lowerBound, upperBound));
       
// run the query
StateQueryResult<KeyValueIterator<Windowed<Bytes>>, byte[]> result = kafkaStreams.query(query);
  
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Windowed<Bytes>>, byte[]> partitionResults = rangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Windowed<Bytes>>, byte[]> entry : partitionResults.entrySet()) {
    try (final KeyValueIterator<Windowed<Bytes>>, byte[] keyValueIterator = entry.getValue().getResult()) {
        while (keyValueIterator.hasNext()) {
            final KeyValue<Bytes, byte[]> next = keyValueIterator.next();
            Bytes key = next.key.get;
            byte[] value = next.value;
        }
     }
}  WindowRangeQuery.withWindowStartRange(timeFrom, timeTo);

final StateQueryRequest<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> request = inStore("inmemory-window-store").withQuery(query);
final StateQueryResult<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> result = kafkaStreams.query(request);
final KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult();


Compatibility, Deprecation, and Migration Plan

...