You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 26 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: "Under Discussion"

Discussion thread: here 

JIRA: KAFKA-13494

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Provide an implementation of the Query interface, introduced in KIP-796: Interactive Query v2 , to support session and window queries.

Public Interfaces

In this KIP we propose two new public classes:

WindowKeyQuery

This type used to search occurrences of keys in window stores in combination with IQv2. In particular, using this query will allow applications to query WindowStore.fetch(K key, Instant timeFrom, Instant timeTo).

WindowRangeQuery

The query type is used to search per window aggregates of keys in window and session stores in combination with IQv2. In particular, using this query on a window store will allow applications to query WindowStore.fetchAll(Instant timeFrom, Instant timeTo). Using this query on a session store will allow applications to query SessionStore.fetch(Bytes key).

Proposed Changes

The WindowKeyQuery class:

public class WindowKeyQuery<K, V> implements Query<WindowStoreIterator<V>> {

    public static <K, V> WindowKeyQuery<K, V> withKeyAndWindowBounds(final K key, final Instant windowLower, final Instant windowUpper);

    public K getKey();

    public Optional<Instant> getWindowLowerBound();

    public Optional<Instant> getWindowUpperBound();
}

The WindowRangeQuery class:

public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {

    public static <K, V> WindowRangeQuery<K, V> withKey(final K key);          

    public static <K, V> WindowRangeQuery<K, V> withWindowStartRange(final Instant earliestWindowStartTime, final Instant latestWindowStartTime);

    public static <K, V> WindowRangeQuery<K, V> withWindowStartAndEndRange(final Instant earliestWindowStartTime, final Instant latestWindowEndTime);

    public Optional<Instant> getWindowLowerBound();

    public Optional<Instant> getWindowUpperBound();
}

Examples

The following example illustrates the use of the WindowKeyQuey class.

final Instant upper = Instant.now();
final Instant lower = upper.minusSeconds(60);
final WindowKeyQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = WindowKeyQuery.withKeyAndWindowBounds(key, lower, upper);

final StateQueryRequest<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> request = inStore("rocksdb-window-store").withQuery(query);
final StateQueryResult<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> result = stateStore.getKafkaStreams().query(request);
final WindowStoreIterator<ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult();

The following example illustrates the use of the WindowQuery class to query a window store.

final Instant upper = Instant.now();
final Instant lower = upper.minusSeconds(60);

final WindowRangeQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = WindowRangeQuery.withWindowRange(lower, upper);

final StateQueryRequest<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> request = inStore("inmemory-window-store").withQuery(query);
final StateQueryResult<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> result = stateStore.getKafkaStreams().query(request);
final KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult();

Compatibility, Deprecation, and Migration Plan

  • Since this is a completely new set of APIs, no backward compatibility concerns are anticipated. 
  • Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.


  • No labels