Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams.state;


@Deprecate
public interface WindowStoreIterator<V> extends KeyValueIterator<Long, V>, Closeable;

public interface ReadOnlyWindowStore<K, V> {
  // already deprecated (cf. KIP-358)
  @Deprecate
  WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);

  // kept
  KeyValueIterator<Windowed<K>, V> all();

  // newly deprecated
  @Deprecate
  V fetch(K key, long time);
  @Deprecate
  WindowStoreIterator<V> fetch(K key, Instant from, Instant to) throws IllegalArgumentException;
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Instant toTime) throws IllegalArgumentException;
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(Instant from, Instant to) throws IllegalArgumentException;
  @Deprecate
  KeyValueIterator<Windowed<K>, V> all();

  // new
  V get(K key, long windowStartTime);
  KeyValueIterator<Windowed<K>, V> range(K key, Instant from, Instant to) throws IllegalArgumentException;
  KeyValueIterator<Windowed<K>, V> range(K from, K to, Instant fromTimefrom, Instant toTimeto) throws IllegalArgumentException;
  KeyValueIterator<Windowed<K>, V> range(Instant from, Instant to) throws IllegalArgumentException;
  KeyValueIterator<Windowed<K>, V> range();
}

// note: `WindowStore` does not deprecate overloads with primitive types
//       hence, we need to deprecate them now, and add corresponding `range()` methods
public interface WindowStore<K, V> extends ReadOnlyWindowStore<K, V> {
  // deprecate
  @Deprecate
  WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
  @Deprecate
  WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
  @Deprecate
  KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) ;

  // new
  KeyValueIterator<Windowed<K>, V> range(K key, long timeFromfromTime, long timeTotoTime);
  KeyValueIterator<Windowed<K>, V> range(final K key, final Instant from, final Instant to);
  KeyValueIterator<Windowed<K>, V> range(K from, K to, long timeFromfromTime, long timeTo);
  KeyValueIterator<Windowed<K>, V> range(final K from, final K to, final Instant fromTime, final Instant toTime);
  KeyValueIterator<Windowed<K>, V> range(long timeFromfromTime, long timeTotoTime);
  KeyValueIterator<Windowed<K>, V> range(final Instant from, final Instant to) ;
}

public interface ReadOnlySessionStore<K, ReadOnlySessionStoreV> {
  // deprecated
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> fetch(final K key);
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to);

  // addednew
  KeyValueIterator<Windowed<K>, AGG> range(final K key);
  KeyValueIterator<Windowed<K>, AGG> range(final K from, final K to);
}

public interface SessionStore SessionStore<K, V> extends ReadOnlySessionStore<K, V> {
  // deprecated
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  AGG fetchSession(final K key, final long startTime, final long endTime);

  // addednew
  KeyValueIterator<Windowed<K>, AGG> range(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  KeyValueIterator<Windowed<K>, AGG> range(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime);
  @Deprecate
  AGG get(final K key, long finalsessionStartTime, long startTime, final long endTime sessionEndTime);
}

// unmodified: just added for completeness
public interface ReadOnlyKeyValueStore<K, V> {
  // kept
  KeyValueIterator<K, V> all();
}



// Metrics: kafka.streams
type = stream-[storeType]-metrics
client-id = [thread ID]
task-id = [task ID]
[storeType]-id = [store name]
level = DEBUG

// deprecated:
[store-type] = rocksdb-window-state / in-memory-window-state / rocksdb-session-state / in-memory-session-state
name = fetch-latency-avg
name = fetch-latency-max
name = fetch-rate
name = fetch-total

[store-type] = rocksdb-state / in-memory--state
name = all-latency-avg
name = all-latency-max
name = all-rate
name = all-total

name = range-latency-avg
name = range-latency-max
name = range-rate
name = range-total


// new
[store-type] = rocksdb-window-state / in-memory-window-state
name = get-latency-avg
name = get-latency-max
name = get-rate
name = get-total

name = range-single-key-time-range-latency-avg
name = range-single-key-time-range-latency-max
name = range-single-key-time-range-rate
name = range-single-key-time-range-total

name = range-key-and-time-latency-avg
name = range-key-and-time-latency-max
name = range-key-and-time-rate
name = range-key-and-time-total

name = range-time-latency-avg
name = range-time-latency-max
name = range-time-rate
name = range-time-total

name = range-all-latency-avg
name = range-all-latency-max
name = range-all-rate
name = range-all-total

[store-type] = rocksdb-session-state / in-memory-session-state
name = range-key-latency-avg
name = range-key-latency-max
name = range-key-rate
name = range-key-total

name = range-single-key-time-range-latency-avg
name = range-single-key-time-range-latency-max
name = range-single-key-time-range-rate
name = range-single-key-time-range-total

name = range-single-key-latency-avg
name = range-single-key-latency-max
name = range-single-key-rate
name = range-single-key-total

name = range-key-and-time-latency-avg
name = range-key-and-time-latency-max
name = range-key-and-time-rate
name = range-key-and-time-total

name = get-latency-avg
name = get-latency-max
name = get-rate
name = get-total

[store-type] = rocksdb-state / in-memory-state
name = range-all-latency-avg
name = range-all-latency-max
name = range-all-rate
name = range-all-total

name = range-key-latency-avg
name = range-key-latency-max
name = range-key-rate
name = range-key-total

Proposed Changes

We propose to replace `WindowStoreIterator` with `KeyValueIterator` as return type for all range query methods. Additionally, the return type for window/session stores should be changed to `Windowed<K>` key type instead of returning only a `Long` that encoded the window-start timestamp. For this, the `WindowStoreIterator` interface and the corresponding `fetch()` methods are deprecated and new methods with new return types are added.

...