THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.state; @Deprecate public interface WindowStoreIterator<V> extends KeyValueIterator<Long, V>, Closeable; public interface ReadOnlyWindowStore<K, V> { // already deprecated (cf. KIP-358) @Deprecate WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo); // kept KeyValueIterator<Windowed<K>, V> all(); // newly deprecated @Deprecate V fetch(K key, long time); @Deprecate WindowStoreIterator<V> fetch(K key, Instant from, Instant to) throws IllegalArgumentException; @Deprecate KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant toTime) throws IllegalArgumentException; @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to) throws IllegalArgumentException; @Deprecate KeyValueIterator<Windowed<K>, V> all(); // new V get(K key, long windowStartTime); KeyValueIterator<Windowed<K>, V> range(K key, Instant from, Instant to) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> range(K from, K to, Instant fromTimefrom, Instant toTimeto) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> range(Instant from, Instant to) throws IllegalArgumentException; KeyValueIterator<Windowed<K>, V> range(); } // note: `WindowStore` does not deprecate overloads with primitive types // hence, we need to deprecate them now, and add corresponding `range()` methods public interface WindowStore<K, V> extends ReadOnlyWindowStore<K, V> { // deprecate @Deprecate WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); @Deprecate WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to); @Deprecate KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime); @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo); @Deprecate KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) ; // new KeyValueIterator<Windowed<K>, V> range(K key, long timeFromfromTime, long timeTotoTime); KeyValueIterator<Windowed<K>, V> range(final K key, final Instant from, final Instant to); KeyValueIterator<Windowed<K>, V> range(K from, K to, long timeFromfromTime, long timeTo); KeyValueIterator<Windowed<K>, V> range(final K from, final K to, final Instant fromTime, final Instant toTime); KeyValueIterator<Windowed<K>, V> range(long timeFromfromTime, long timeTotoTime); KeyValueIterator<Windowed<K>, V> range(final Instant from, final Instant to) ; } public interface ReadOnlySessionStore<K, ReadOnlySessionStoreV> { // deprecated @Deprecate KeyValueIterator<Windowed<K>, AGG> fetch(final K key); @Deprecate KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to); // addednew KeyValueIterator<Windowed<K>, AGG> range(final K key); KeyValueIterator<Windowed<K>, AGG> range(final K from, final K to); } public interface SessionStore SessionStore<K, V> extends ReadOnlySessionStore<K, V> { // deprecated @Deprecate KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate AGG fetchSession(final K key, final long startTime, final long endTime); // addednew KeyValueIterator<Windowed<K>, AGG> range(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate KeyValueIterator<Windowed<K>, AGG> range(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); @Deprecate AGG get(final K key, long finalsessionStartTime, long startTime, final long endTime sessionEndTime); } // unmodified: just added for completeness public interface ReadOnlyKeyValueStore<K, V> { // kept KeyValueIterator<K, V> all(); } // Metrics: kafka.streams type = stream-[storeType]-metrics client-id = [thread ID] task-id = [task ID] [storeType]-id = [store name] level = DEBUG // deprecated: [store-type] = rocksdb-window-state / in-memory-window-state / rocksdb-session-state / in-memory-session-state name = fetch-latency-avg name = fetch-latency-max name = fetch-rate name = fetch-total [store-type] = rocksdb-state / in-memory--state name = all-latency-avg name = all-latency-max name = all-rate name = all-total name = range-latency-avg name = range-latency-max name = range-rate name = range-total // new [store-type] = rocksdb-window-state / in-memory-window-state name = get-latency-avg name = get-latency-max name = get-rate name = get-total name = range-single-key-time-range-latency-avg name = range-single-key-time-range-latency-max name = range-single-key-time-range-rate name = range-single-key-time-range-total name = range-key-and-time-latency-avg name = range-key-and-time-latency-max name = range-key-and-time-rate name = range-key-and-time-total name = range-time-latency-avg name = range-time-latency-max name = range-time-rate name = range-time-total name = range-all-latency-avg name = range-all-latency-max name = range-all-rate name = range-all-total [store-type] = rocksdb-session-state / in-memory-session-state name = range-key-latency-avg name = range-key-latency-max name = range-key-rate name = range-key-total name = range-single-key-time-range-latency-avg name = range-single-key-time-range-latency-max name = range-single-key-time-range-rate name = range-single-key-time-range-total name = range-single-key-latency-avg name = range-single-key-latency-max name = range-single-key-rate name = range-single-key-total name = range-key-and-time-latency-avg name = range-key-and-time-latency-max name = range-key-and-time-rate name = range-key-and-time-total name = get-latency-avg name = get-latency-max name = get-rate name = get-total [store-type] = rocksdb-state / in-memory-state name = range-all-latency-avg name = range-all-latency-max name = range-all-rate name = range-all-total name = range-key-latency-avg name = range-key-latency-max name = range-key-rate name = range-key-total |
Proposed Changes
We propose to replace `WindowStoreIterator` with `KeyValueIterator` as return type for all range query methods. Additionally, the return type for window/session stores should be changed to `Windowed<K>` key type instead of returning only a `Long` that encoded the window-start timestamp. For this, the `WindowStoreIterator` interface and the corresponding `fetch()` methods are deprecated and new methods with new return types are added.
...