Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The `WindowStore` interface has multiple methods to fetch() data. However, the return types are mixed up. Two methods return `WindowStoreIterator` while all others return `KeyValueIterator`. We should align the return types and replace `WindowStoreIterator` with `KeyValueIterator`. Additionally, the return type should be changed to based on `Windowed<K>` key type instead of returning only a `Long` that encoded the window-start timestamp. For this, all existing `fetch()` methods need to be deprecated and replaced with new methods `range()`.

Furthermore, we want to split up metric to be able to track each method call independently. At the moment, there is a `fetch` metric that is used by all methods called `fetch()` independent of their parameter list.

Public Interfaces

We propose to deprecate interface `WindowStoreIterator` and the following `fetch(...)` methods. Furthermore, new `range(...)` methods are added:

Code Block
languagejava
package org.apache.kafka.streams.state;


@Deprecate
public interface WindowStoreIterator<V> extends KeyValueIterator<Long, V>, Closeable;

public interface ReadOnlyWindowStore<K, V> {
  // already deprecated (cf. KIP-358)
  @Deprecate
  WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);

  // newly deprecated
  @Deprecate
  V fetch(K key, long time);
  @Deprecate
  WindowStoreIterator<V> fetch(K key, Instant from, Instant to) throws IllegalArgumentException;
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant toTime);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to) throws IllegalArgumentException;
  @Deprecate
  KeyValueIterator<Windowed<K>, V> all();

  // new 
  V get(K key, long windowStartTime);
  KeyValueIterator<Windowed<K>, V> range(K key, Instant from, Instant to) throws IllegalArgumentException;
  KeyValueIterator<Windowed<K>, V> range(K from, K to, Instant fromTime, Instant toTime);
  KeyValueIterator<Windowed<K>, V> range(Instant from, Instant to) throws IllegalArgumentException;
  KeyValueIterator<Windowed<K>, V> range();
}

// note: `WindowStore` does not deprecate overloads with primitive types
//       hence, we need to deprecate them now, and add corresponding new methods
public interface WindowStore<K, V> {
  @Deprecate
  WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
  @Deprecate
  WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) ;

  // new
  KeyValueIterator<Windowed<K>, V> range(K key, long timeFrom, long timeTo);
  KeyValueIterator<Windowed<K>, V> range(final K key, final Instant from, final Instant to);
  KeyValueIterator<Windowed<K>, V> range(K from, K to, long timeFrom, long timeTo);
  KeyValueIterator<Windowed<K>, V> range(final K from, final K to, final Instant fromTime, final Instant toTime);
  KeyValueIterator<Windowed<K>, V> range(long timeFrom, long timeTo);
  KeyValueIterator<Windowed<K>, V> range(final Instant from, final Instant to) ;
}

public interface ReadOnlySessionStore {
  // deprecated
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> fetch(final K key);
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to);

  // added
  KeyValueIterator<Windowed<K>, AGG> range(final K key);
  KeyValueIterator<Windowed<K>, AGG> range(final K from, final K to);
}

public interface SessionStore {
  // deprecated
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  AGG fetchSession(final K key, final long startTime, final long endTime);

  // added
  KeyValueIterator<Windowed<K>, AGG> range(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> range(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  AGG get(final K key, final long startTime, final long endTime);
}

// Metrics: kafka.streams
type = stream-[storeType]-metrics
client-id = [thread ID]
task-id = [task ID]
[storeType]-id = [store name]
level = DEBUG

// deprecated:
[store-type] = rocksdb-window-state / in-memory-window-state / rocksdb-session-state / in-memory-session-state
name = fetch-latency-avg
name = fetch-latency-max
name = fetch-rate
name = fetch-total

// new
[store-type] = rocksdb-window-state / in-memory-window-state
name = get-latency-avg
name = get-latency-max
name = get-rate
name = get-total

name = range-single-key-time-range-latency-avg
name = range-single-key-time-range-latency-max
name = range-single-key-time-range-rate
name = range-single-key-time-range-total

name = range-key-and-time-latency-avg
name = range-key-and-time-latency-max
name = range-key-and-time-rate
name = range-key-and-time-total

name = range-time-latency-avg
name = range-time-latency-max
name = range-time-rate
name = range-time-total

name = range-all-latency-avg
name = range-all-latency-max
name = range-all-rate
name = range-all-total

[store-type] = rocksdb-session-state / in-memory-session-state
name = range-key-latency-avg
name = range-key-latency-max
name = range-key-rate
name = range-key-total

name = range-single-key-time-range-latency-avg
name = range-single-key-time-range-latency-max
name = range-single-key-time-range-rate
name = range-single-key-time-range-total

name = range-single-key-latency-avg
name = range-single-key-latency-max
name = range-single-key-rate
name = range-single-key-total

name = range-key-and-time-latency-avg
name = range-key-and-time-latency-max
name = range-key-and-time-rate
name = range-key-and-time-total

name = get-latency-avg
name = get-latency-max
name = get-rate
name = get-total

Proposed Changes

We suggest to replace `WindowStoreIterator` with `KeyValueIterator`, however, we cannot change the return type of `WindowStore#fetch(K key, long timeFrom, long timeTo)` or `WindowStore#fetch(K key, Instant timeFrom, Instant timeTo)` right away because this would break backward compatibility. Instead, we only deprecate the interface `WindowStoreIterator`. This indicates to users, that they need to rewrite their code and to not use `WindowStoreIterator` any longer. We document this in all related JavaDocs to educate users that they should use `KeyValueIterator` instead.In a future release, we can actually remove `WindowStoreIterator` and replace the return type to `KeyValueIterator`. At this point, all user code should be rewritten already such the return type change is safeneed to deprecate the corresponding method and add a new method with new name return type (ie, `fetch(K key, Instant from, Instant to)` → `range(K key, Instant from, Instant to)`). To avoid confusion about naming, we propose to deprecate all other `fetch()` methods and rename them to `range()`, too. This also include renaming the method on `ReadOnlySessionStore`.

Compatibility, Deprecation, and Migration Plan

Because we only deprecate an interface but not change any other code this change is backward compatible. In a future release, when the interface is actually removed and we change the return type, it is after the deprecation period and thus user code should be rewritten already.

Test Plan

N/A

Rejected Alternatives

N/A