You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state"Under Discussion"

Discussion thread: [DISCUSS] KIP-439: Deprecate Interface WindowStoreIterator

JIRA Unable to render Jira issues macro, execution error.

Released: 2.3

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The `WindowStore` interface has multiple methods to fetch() data. However, the return types are mixed up. Two methods return `WindowStoreIterator` while all others return `KeyValueIterator`. We should align the return types and replace `WindowStoreIterator` with `KeyValueIterator`. Additionally, the return type should be changed to based on `Windowed<K>` key type instead of returning only a `Long` that encoded the window-start timestamp. For this, all existing `fetch()` methods need to be deprecated and replaced with new methods `range()`.

Furthermore, we want to split up metric to be able to track each method call independently. At the moment, there is a `fetch` metric that is used by all methods called `fetch()` independent of their parameter list.

Public Interfaces

We propose to deprecate interface `WindowStoreIterator` and the following `fetch(...)` methods. Furthermore, new `range(...)` methods are added:

package org.apache.kafka.streams.state;


@Deprecate
public interface WindowStoreIterator<V> extends KeyValueIterator<Long, V>, Closeable;

public interface ReadOnlyWindowStore<K, V> {
  // already deprecated (cf. KIP-358)
  @Deprecate
  WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);

  // newly deprecated
  @Deprecate
  V fetch(K key, long time);
  @Deprecate
  WindowStoreIterator<V> fetch(K key, Instant from, Instant to) throws IllegalArgumentException;
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant toTime);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to) throws IllegalArgumentException;
  @Deprecate
  KeyValueIterator<Windowed<K>, V> all();

  // new 
  V get(K key, long windowStartTime);
  KeyValueIterator<Windowed<K>, V> range(K key, Instant from, Instant to) throws IllegalArgumentException;
  KeyValueIterator<Windowed<K>, V> range(K from, K to, Instant fromTime, Instant toTime);
  KeyValueIterator<Windowed<K>, V> range(Instant from, Instant to) throws IllegalArgumentException;
  KeyValueIterator<Windowed<K>, V> range();
}

// note: `WindowStore` does not deprecate overloads with primitive types
//       hence, we need to deprecate them now, and add corresponding new methods
public interface WindowStore<K, V> {
  @Deprecate
  WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
  @Deprecate
  WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) ;

  // new
  KeyValueIterator<Windowed<K>, V> range(K key, long timeFrom, long timeTo);
  KeyValueIterator<Windowed<K>, V> range(final K key, final Instant from, final Instant to);
  KeyValueIterator<Windowed<K>, V> range(K from, K to, long timeFrom, long timeTo);
  KeyValueIterator<Windowed<K>, V> range(final K from, final K to, final Instant fromTime, final Instant toTime);
  KeyValueIterator<Windowed<K>, V> range(long timeFrom, long timeTo);
  KeyValueIterator<Windowed<K>, V> range(final Instant from, final Instant to) ;
}

public interface ReadOnlySessionStore {
  // deprecated
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> fetch(final K key);
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to);

  // added
  KeyValueIterator<Windowed<K>, AGG> range(final K key);
  KeyValueIterator<Windowed<K>, AGG> range(final K from, final K to);
}

public interface SessionStore {
  // deprecated
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  AGG fetchSession(final K key, final long startTime, final long endTime);

  // added
  KeyValueIterator<Windowed<K>, AGG> range(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> range(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  AGG get(final K key, final long startTime, final long endTime);
}

// Metrics: kafka.streams
type = stream-[storeType]-metrics
client-id = [thread ID]
task-id = [task ID]
[storeType]-id = [store name]
level = DEBUG

// deprecated:
[store-type] = rocksdb-window-state / in-memory-window-state / rocksdb-session-state / in-memory-session-state
name = fetch-latency-avg
name = fetch-latency-max
name = fetch-rate
name = fetch-total

// new
[store-type] = rocksdb-window-state / in-memory-window-state
name = get-latency-avg
name = get-latency-max
name = get-rate
name = get-total

name = range-single-key-time-range-latency-avg
name = range-single-key-time-range-latency-max
name = range-single-key-time-range-rate
name = range-single-key-time-range-total

name = range-key-and-time-latency-avg
name = range-key-and-time-latency-max
name = range-key-and-time-rate
name = range-key-and-time-total

name = range-time-latency-avg
name = range-time-latency-max
name = range-time-rate
name = range-time-total

name = range-all-latency-avg
name = range-all-latency-max
name = range-all-rate
name = range-all-total

[store-type] = rocksdb-session-state / in-memory-session-state
name = range-key-latency-avg
name = range-key-latency-max
name = range-key-rate
name = range-key-total

name = range-single-key-time-range-latency-avg
name = range-single-key-time-range-latency-max
name = range-single-key-time-range-rate
name = range-single-key-time-range-total

name = range-single-key-latency-avg
name = range-single-key-latency-max
name = range-single-key-rate
name = range-single-key-total

name = range-key-and-time-latency-avg
name = range-key-and-time-latency-max
name = range-key-and-time-rate
name = range-key-and-time-total

name = get-latency-avg
name = get-latency-max
name = get-rate
name = get-total

Proposed Changes

We suggest to replace `WindowStoreIterator` with `KeyValueIterator`, however, we cannot change the return type of `WindowStore#fetch(K key, long timeFrom, long timeTo)` or `WindowStore#fetch(K key, Instant timeFrom, Instant timeTo)` because this would break backward compatibility. Instead, we need to deprecate the corresponding method and add a new method with new name return type (ie, `fetch(K key, Instant from, Instant to)` → `range(K key, Instant from, Instant to)`). To avoid confusion about naming, we propose to deprecate all other `fetch()` methods and rename them to `range()`, too. This also include renaming the method on `ReadOnlySessionStore`.

Compatibility, Deprecation, and Migration Plan

Because we only deprecate an interface but not change any other code this change is backward compatible. In a future release, when the interface is actually removed and we change the return type, it is after the deprecation period and thus user code should be rewritten already.

Test Plan

N/A

Rejected Alternatives

N/A

  • No labels