Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest”earliest

Similar guarantees are provided on other fetch and range operations. But in the case of key ranges, there are some nuances regarding order:

The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s and must not return null values. No ordering guarantees are provided.

Ordering is not guaranteed as backing structure is based on maps keyed by o.a.k.common.utils.Bytes. Though, Bytes support Lexicographic byte array comparison, which defines ordering in-memory and RocksDB stores.

These This APIs constraint the usage of local state store for some use-cases:

...

If a backward read direction option becomes available, then we could start from the latest record within a time range and go backwards, returning the first N value more efficiently.

At Zipkin Kafka-based storage, we are planning to use this feature to replace two KeyValueStores (one for traces indexed by id, and another with trace_ids indexed by timestamp) for one WindowStore. A backward read direction will allow to support queries like: “within this time range, find the last traces that match this criteria”, and return latest values quickly.

Internally, both implementations: persistent (RocksDB), and in-memory (TreeMap) support reverse/descending iteration:

final RocksIterator iter = db.newIterator();
iter.seekToFirst();
iter.next();
final RocksIterator reverse = db.newIterator();
reverse.seekToLast();
reverse.prev();
final TreeMap<String, String> map = new TreeMap<>();
final NavigableSet<String> nav = map.navigableKeySet();
final NavigableSet<String> rev = map.descendingKeySet();

Reference issues

Proposed Changes

Introduce a new StreamsConfig configuration to flag support for backwards iteration:

Code Block
languagejava
public class StreamsConfig extends AbstractConfig {
    public static final String ENABLE_BACKWARD_ITERATION_CONFIG = "enable.backward.iteration";
    private static final String ENABLE_BACKWARD_ITERATION_DOC = "If true any range operations will accept (from, to) arguments to be from > to, returning recent records first";
}

If true, ReadOnlyKeyValueStore will support (from, to) argument pairs to be: from > to, returning iteration in reverse order.

To complete support for backwards iteration, `all` operations will be companioned by a `reverseAll`:

Code Block
languagejava
public interface ReadOnlyKeyValueStore<K, V> { KeyValueIterator<K, V> reverseAll(); } 
public interface ReadOnlyWindowStore<K, V> { KeyValueIterator<Windowed<K>, V> reverseAll(); } 

StreamConfig flag will be passed to Stores via `ProcessorContext`.

Internally, both implementations: persistent (RocksDB), and in-memory (TreeMap) support reverse/descending iteration:

...

languagejava

...

There are 2 important ranges in Kafka Streams Stores:

  • Key Range
  • Time Range

Reverse Key Ranges

Add a new interface for reverse KeyValueStore

public interface ReadOnlyReverseKeyValueStore<K, V> {
    KeyValueIterator<K, V> reverseRange(K from, K to);
    KeyValueIterator<K, V> reverseAll();
}

And create a new interface on top of KeyValueStore:


public interface KeyValueStoreWithReverseIteration<K, V> extends KeyValueStore<K, V>, ReadOnlyReverseKeyValueStore<K, V> {
//...
}

Backward Time Ranges


Window and Session stores are based on a set of KeyValue Stores (Segments) organized by a time-based index. Therefore, for these stores time-range is more important than key-range to lookup for values.


A new interfaces will be added for each Store type, including backward methods:


public interface ReadOnlyBackwardWindowStore<K, V> {
    @Deprecated
    WindowStoreIterator<V> backwardFetch(K key, long timeFrom, long timeTo);
    WindowStoreIterator<V> backwardFetch(K key, Instant from, Instant to) throws IllegalArgumentException;
    @Deprecated
    KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to, Instant fromTime, Instant toTime) throws IllegalArgumentException;
    KeyValueIterator<Windowed<K>, V> backwardAll();
    @Deprecated
    KeyValueIterator<Windowed<K>, V> backwardFetchAll(long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> backwardFetchAll(Instant from, Instant to) throws IllegalArgumentException;
}

public interface ReadOnlyBackwardSessionStore<K, AGG> {
    KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K from, final K to);
    KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);
    KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime);
}

And both SessionStore and WindowStore will be extended with Interfaces on top of them:

public interface WindowStoreWithBackwardInteration<K, V> extends WindowStore<K, V>, ReadOnlyBackwardWindowStore<K, V> {
// ...
}

public interface SessionStoreWithBackwardIteration<K, AGG> extends SessionStore<K, AGG>, ReadOnlyBackwardSessionStore<K, AGG> {
//...
}

Class hierarchy will change from:

Image Added


To:

Image Added

Compatibility, Deprecation, and Migration Plan

StreamsConfig will mitigate affecting users that are relying on current behaviour: if from>to, then return empty iterator. Only when users are enabling flag from>to will return a reversed iterator.

As new top interfaces will be implemented by internal components (e.g. RocksDB, InMemory, etc). supporting old one already, current users won't be affected.

New users will be able to choose between current APIs (forward-only) or proposed ones (forward+backward)Therefore this change is backwards compatible.

Rejected Alternatives

  • Initially it was considered to have additional parameter on all readOnlyStore methods e.g. Store#fetch(keyFrom, keyTo, timeFrom, timeTo, ReadDirection.FORWARD|BACKWARD), but has been declines as passing arguments in inverse is more intuitive. As this could cause unexpected effects in future versions, a flag has been added to overcome this.
  • Implicit ordering by flipping from and to variables has been discouraged in favor of a more explicit approach based on new interfaces that make explicit the availability of reverse and backward fetch operations.