...
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Released: 2.34
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The window and session store interfaces have confusing method names and return types:
- The
...
- `ReadOnlyWindowStore` interface has multiple methods to
...
- `fetch()` data. However, the return types are mixed up. Two methods return `WindowStoreIterator` while all others return `KeyValueIterator`.
- The `ReadOnlyWindowStore` interface has method `fetch()`, `fetchAll()`, and `all()` that all do range queries with different parameters.
- The `ReadOnlySessionStore` interface has `fetch()` method while the `SessionStore` interface has `findSession()` methods that all do range queries with different parameters
- The `WindStore#fetch(K, long)` and `SessionStore#fetchSession(k, long, long)` return a single value of a stored window or session but don't do a range query.
We should align the return types and method nodes of both interfaces to get a unified API.
Additionally, the provided metrics with name `fetch` is used for all above mentioned methods (ie, implicit roll-up) making it hard to reason about the metrics We should align the return types and replace `WindowStoreIterator` with `KeyValueIterator`. Additionally, the return type should be changed to based on `Windowed<K>` key type instead of returning only a `Long` that encoded the window-start timestamp. For this, all existing `fetch()` methods need to be deprecated and replaced with new methods `range()`.Furthermore, we want to split up metric to be able to track each method call independently. At the moment, there is a `fetch` metric that is used by all methods called `fetch()` independent of their parameter list.
Public Interfaces
We propose to deprecate interface `WindowStoreIterator` and the following `fetch(...)` methods. Furthermore, new `range(...)` and `get()` methods are added :to replace existing ones.
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.state; @Deprecate public interface WindowStoreIterator<V> extends KeyValueIterator<Long, V>, Closeable; public interface ReadOnlyWindowStore<K, V> { // already deprecated (cf. KIP-358) @Deprecate WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo); // newly deprecated @Deprecate V fetch(K key, long time); @Deprecate WindowStoreIterator<V> fetch(K key, Instant from, Instant to) throws IllegalArgumentException; @Deprecate KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant toTime); @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to) throws IllegalArgumentException; @Deprecate KeyValueIterator<Windowed<K>, V> all(); // new V get(K key, long windowStartTime); KeyValueIterator<Windowed<K>, V> range(K key, Instant from, Instant to) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> range(K from, K to, Instant fromTime, Instant toTime); KeyValueIterator<Windowed<K>, V> range(Instant from, Instant to) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> range(); } // note: `WindowStore` does not deprecate overloads with primitive types // hence, we need to deprecate them now, and add corresponding new`range()` methods public interface WindowStore<K, V> { // deprecate @Deprecate WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); @Deprecate WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to); @Deprecate KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime); @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) ; // new KeyValueIterator<Windowed<K>, V> range(K key, long timeFrom, long timeTo); KeyValueIterator<Windowed<K>, V> range(final K key, final Instant from, final Instant to); KeyValueIterator<Windowed<K>, V> range(K from, K to, long timeFrom, long timeTo); KeyValueIterator<Windowed<K>, V> range(final K from, final K to, final Instant fromTime, final Instant toTime); KeyValueIterator<Windowed<K>, V> range(long timeFrom, long timeTo); KeyValueIterator<Windowed<K>, V> range(final Instant from, final Instant to) ; } public interface ReadOnlySessionStore { // deprecated @Deprecate KeyValueIterator<Windowed<K>, AGG> fetch(final K key); @Deprecate KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to); // added KeyValueIterator<Windowed<K>, AGG> range(final K key); KeyValueIterator<Windowed<K>, AGG> range(final K from, final K to); } public interface SessionStore { // deprecated @Deprecate KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate AGG fetchSession(final K key, final long startTime, final long endTime); // added KeyValueIterator<Windowed<K>, AGG> range(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate KeyValueIterator<Windowed<K>, AGG> range(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate AGG get(final K key, final long startTime, final long endTime); } // Metrics: kafka.streams type = stream-[storeType]-metrics client-id = [thread ID] task-id = [task ID] [storeType]-id = [store name] level = DEBUG // deprecated: [store-type] = rocksdb-window-state / in-memory-window-state / rocksdb-session-state / in-memory-session-state name = fetch-latency-avg name = fetch-latency-max name = fetch-rate name = fetch-total // new [store-type] = rocksdb-window-state / in-memory-window-state name = get-latency-avg name = get-latency-max name = get-rate name = get-total name = range-single-key-time-range-latency-avg name = range-single-key-time-range-latency-max name = range-single-key-time-range-rate name = range-single-key-time-range-total name = range-key-and-time-latency-avg name = range-key-and-time-latency-max name = range-key-and-time-rate name = range-key-and-time-total name = range-time-latency-avg name = range-time-latency-max name = range-time-rate name = range-time-total name = range-all-latency-avg name = range-all-latency-max name = range-all-rate name = range-all-total [store-type] = rocksdb-session-state / in-memory-session-state name = range-key-latency-avg name = range-key-latency-max name = range-key-rate name = range-key-total name = range-single-key-time-range-latency-avg name = range-single-key-time-range-latency-max name = range-single-key-time-range-rate name = range-single-key-time-range-total name = range-single-key-latency-avg name = range-single-key-latency-max name = range-single-key-rate name = range-single-key-total name = range-key-and-time-latency-avg name = range-key-and-time-latency-max name = range-key-and-time-rate name = range-key-and-time-total name = get-latency-avg name = get-latency-max name = get-rate name = get-total |
Proposed Changes
We suggest propose to replace `WindowStoreIterator` replace `WindowStoreIterator` with `KeyValueIterator` , however, we cannot change as return type for all range query methods. Additionally, the return type of `WindowStore#fetch(K key, long timeFrom, long timeTo)` or `WindowStore#fetch(K key, Instant timeFrom, Instant timeTo)` because this would break backward compatibility. Instead, we need to deprecate the corresponding method and add a new method with new name return type (ie, `fetch(K key, Instant from, Instant to)` → `range(K key, Instant from, Instant to)`). To avoid confusion about naming, we propose to deprecate all other `fetch()` methods and rename them to `range()`, too. This also include renaming the method on `ReadOnlySessionStore`should be changed to `Windowed<K>` key type instead of returning only a `Long` that encoded the window-start timestamp. For this, the `WindowStoreIterator` interface and the corresponding `fetch()` methods are deprecated and new methods with new return types are added.
Furthermore, all methods returning an Iterator should have the same name (`range`), while method that don't return an Iterator should have a different name (`get`).
Additionally, we want to split up metrics to be able to track each method call independently. At the moment, there is a `fetch` metric that is used by all methods called `fetch()/fetchAll()/all()/findSession()` independent of their parameter list. We split the existing `fetch` metric into fine grained metrics per method overload.
Compatibility, Deprecation, and Migration Plan
Because we only deprecate an interface but not interfaces/methods but don't change any other code this change is backward compatible. In a future release, when the interface is actually removed and we change the return type, it is after the deprecation period and thus user code should be rewritten alreadyCalls to deprecated methods are redirected to new methods and the existing `fetch` metric is still recorded unmodified.
Test Plan
N/A
Rejected Alternatives
...