Status
Current state: "Under Discussion"
Discussion thread: [DISCUSS] KIP-439: Deprecate Interface WindowStoreIterator
JIRA:
Released: 2.3
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The `WindowStore` interface has multiple methods to fetch() data. However, the return types are mixed up. Two methods return `WindowStoreIterator` while all others return `KeyValueIterator`. We should align the return types and replace `WindowStoreIterator` with `KeyValueIterator`. Additionally, the return type should be changed to based on `Windowed<K>` key type instead of returning only a `Long` that encoded the window-start timestamp. For this, all existing `fetch()` methods need to be deprecated and replaced with new methods `range()`.
Furthermore, we want to split up metric to be able to track each method call independently. At the moment, there is a `fetch` metric that is used by all methods called `fetch()` independent of their parameter list.
Public Interfaces
We propose to deprecate interface `WindowStoreIterator` and the following `fetch(...)` methods. Furthermore, new `range(...)` methods are added:
package org.apache.kafka.streams.state; @Deprecate public interface WindowStoreIterator<V> extends KeyValueIterator<Long, V>, Closeable; public interface ReadOnlyWindowStore<K, V> { // already deprecated (cf. KIP-358) @Deprecate WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo); // newly deprecated @Deprecate V fetch(K key, long time); @Deprecate WindowStoreIterator<V> fetch(K key, Instant from, Instant to) throws IllegalArgumentException; @Deprecate KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant toTime); @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to) throws IllegalArgumentException; @Deprecate KeyValueIterator<Windowed<K>, V> all(); // new V get(K key, long windowStartTime); KeyValueIterator<Windowed<K>, V> range(K key, Instant from, Instant to) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> range(K from, K to, Instant fromTime, Instant toTime); KeyValueIterator<Windowed<K>, V> range(Instant from, Instant to) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> range(); } // note: `WindowStore` does not deprecate overloads with primitive types // hence, we need to deprecate them now, and add corresponding new methods public interface WindowStore<K, V> { @Deprecate WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); @Deprecate WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to); @Deprecate KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime); @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) ; // new KeyValueIterator<Windowed<K>, V> range(K key, long timeFrom, long timeTo); KeyValueIterator<Windowed<K>, V> range(final K key, final Instant from, final Instant to); KeyValueIterator<Windowed<K>, V> range(K from, K to, long timeFrom, long timeTo); KeyValueIterator<Windowed<K>, V> range(final K from, final K to, final Instant fromTime, final Instant toTime); KeyValueIterator<Windowed<K>, V> range(long timeFrom, long timeTo); KeyValueIterator<Windowed<K>, V> range(final Instant from, final Instant to) ; } public interface ReadOnlySessionStore { // deprecated @Deprecate KeyValueIterator<Windowed<K>, AGG> fetch(final K key); @Deprecate KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to); // added KeyValueIterator<Windowed<K>, AGG> range(final K key); KeyValueIterator<Windowed<K>, AGG> range(final K from, final K to); } public interface SessionStore { // deprecated @Deprecate KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate AGG fetchSession(final K key, final long startTime, final long endTime); // added KeyValueIterator<Windowed<K>, AGG> range(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate KeyValueIterator<Windowed<K>, AGG> range(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate AGG get(final K key, final long startTime, final long endTime); } // Metrics: kafka.streams type = stream-[storeType]-metrics client-id = [thread ID] task-id = [task ID] [storeType]-id = [store name] level = DEBUG // deprecated: [store-type] = rocksdb-window-state / in-memory-window-state / rocksdb-session-state / in-memory-session-state name = fetch-latency-avg name = fetch-latency-max name = fetch-rate name = fetch-total // new [store-type] = rocksdb-window-state / in-memory-window-state name = get-latency-avg name = get-latency-max name = get-rate name = get-total name = range-single-key-time-range-latency-avg name = range-single-key-time-range-latency-max name = range-single-key-time-range-rate name = range-single-key-time-range-total name = range-key-and-time-latency-avg name = range-key-and-time-latency-max name = range-key-and-time-rate name = range-key-and-time-total name = range-time-latency-avg name = range-time-latency-max name = range-time-rate name = range-time-total name = range-all-latency-avg name = range-all-latency-max name = range-all-rate name = range-all-total [store-type] = rocksdb-session-state / in-memory-session-state name = range-key-latency-avg name = range-key-latency-max name = range-key-rate name = range-key-total name = range-single-key-time-range-latency-avg name = range-single-key-time-range-latency-max name = range-single-key-time-range-rate name = range-single-key-time-range-total name = range-single-key-latency-avg name = range-single-key-latency-max name = range-single-key-rate name = range-single-key-total name = range-key-and-time-latency-avg name = range-key-and-time-latency-max name = range-key-and-time-rate name = range-key-and-time-total name = get-latency-avg name = get-latency-max name = get-rate name = get-total
Proposed Changes
We suggest to replace `WindowStoreIterator` with `KeyValueIterator`, however, we cannot change the return type of `WindowStore#fetch(K key, long timeFrom, long timeTo)` or `WindowStore#fetch(K key, Instant timeFrom, Instant timeTo)` because this would break backward compatibility. Instead, we need to deprecate the corresponding method and add a new method with new name return type (ie, `fetch(K key, Instant from, Instant to)` → `range(K key, Instant from, Instant to)`). To avoid confusion about naming, we propose to deprecate all other `fetch()` methods and rename them to `range()`, too. This also include renaming the method on `ReadOnlySessionStore`.
Compatibility, Deprecation, and Migration Plan
Because we only deprecate an interface but not change any other code this change is backward compatible. In a future release, when the interface is actually removed and we change the return type, it is after the deprecation period and thus user code should be rewritten already.
Test Plan
N/A
Rejected Alternatives
N/A