Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: "Under Discussion"Adopted

Discussion thread: here 

JIRA: KAFKA-13494

...

In this KIP we propose two new public classes:

WindowKeyQuery

...

This type used to search occurrences of keys in window stores in combination with IQv2

...

. In particular the following mappings are established:

  • WindowStore.fetch(K key, Instant timeFrom, Instant timeTo) ← WindowKeyQuery.withKeyAndWindowStartRange(key, from, to)

WindowRangeQuery

The query type is used to search per window aggregates of keys in window and session stores in combination with IQv2. In particular the following mappings are established:

  • WindowStore.fetchAll(Instant timeFrom, Instant timeTo) ← WindowRangeQuery.withWindowStartRange(from, to)
  • SessionStore.fetch(Bytes key) ← WindowRangeQuery.withKey(key)

There are multiple discussion points here:

  • One reason we cannot use WindowKeyQuery to support SessionStore.fetch(key) is because SessionStore.fetch(key) returns a windowed key value iterator, KeyValueIterator<Windowed<Bytes>, byte[]>, whereas WindowKeyQuery is bound to Query<WindowStoreIterator<V>>. 
  • WindowRangeQuery does not currently support key ranges. We could add more cases here to get better coverage of WindowStore and SessionStore API. In this KIP, however, we focus on a subset of queries and defer support for additional cases to future KIPs.

Proposed Changes

The WindowKeyQuery class:

Code Block
@Evolving
public class WindowKeyQuery<K, V> implements Query<WindowStoreIterator<V>> {

    public static <K, V> WindowKeyQuery<K, V> withKeywithKeyAndWindowStartRange(final K key);

    public static <K, V> WindowKeyQuery<K, V> withKeyAndWindowBounds(final K key, final Instant windowLowertimeFrom, final Instant windowUppertimeTo);

    public K getKey();

    public Optional<Instant> getWindowLowerBoundgetTimeFrom();

    public Optional<Instant> getWindowUpperBoundgetTimeTo();
}

The WindowRangeQuery class:

Code Block
@Evolving
public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {

    private WindowRangeQuery(final Optional<K> key, final Optional<Instant> windowLower, final Optional<Instant> windowUpper);

    public static <K, V> WindowRangeQuery<K, V> withKey(final K key);          

    public static <K, V> WindowRangeQuery<K, V> withWindowRangewithWindowStartRange(final Instant windowLowertimeFrom, final Instant windowUppertimeTo);

    public Optional<K>K getKey();

    public  public Optional<Instant> getWindowLowerBoundgetTimeFrom();

    public Optional<Instant> getWindowUpperBoundgetTimeTo();
}

Examples

The following example illustrates the use of the WindowKeyQuey class.

Code Block
final Instant uppertimeTo = Instant.now();
final Instant lowertimeFrom = uppertimeTo.minusSeconds(60);
final WindowKeyQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = WindowKeyQuery.withKeyAndWindowBoundswithKeyAndWindowStartRange(key, lowertimeFrom, uppertimeTo);

final StateQueryRequest<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> request = inStore("rocksdb-window-store").withQuery(query);
final StateQueryResult<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> result = stateStorekafkaStream.getKafkaStreams().query(request);
final WindowStoreIterator<ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult();

The following example illustrates the use of the WindowQuery class to query a window store.

Code Block
final Instant uppertimeTo = Instant.now();
final Instant lowertimeFrom = uppertimeTo.minusSeconds(60);

final WindowRangeQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = WindowRangeQuery.withWindowRangewithWindowStartRange(lowertimeFrom, uppertimeTo);

final StateQueryRequest<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> request = inStore(stateStore.getStateStoreName())"inmemory-window-store").withQuery(query);
final StateQueryResult<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> result = stateStore.getKafkaStreams()kafkaStreams.query(request);
final KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult();

...