Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Internally, both implementations: persistent (RocksDB), and in-memory (TreeMap) support reverse/descending iteration:


Code Block
final RocksIterator iter = db.newIterator();
iter.seekToFirst();
iter.next();
final RocksIterator reverse = db.newIterator();
reverse.seekToLast();
reverse.prev();

...

final 

...

TreeMap<String, 

...

String> map = new 

...

TreeMap<>();
final 

...

NavigableSet<String> nav = map.navigableKeySet();
final 

...

NavigableSet<String> rev = map.descendingKeySet();

...


Reference issues

...

There are 2 important ranges in Kafka Streams Stores:

  • Key Range
  • Time Range


Info

For SessionStore/ReadOnlySessionStore: findSessions and findSession operations will be moved from SessionStore to ReadOnlySessionStore to align with how other stores are design.

Reverse Key Ranges

Extend existing interface for reverse KeyValueStore

...

Code Block
public interface ReadOnlyWindowStore<K, V> {
    default WindowStoreIterator<V> backwardFetch(K key, Instant from, Instant to) throws IllegalArgumentException {
        throw new UnsupportedOperationException(); 
    }

    default KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to, Instant fromTime, Instant toTime) throws IllegalArgumentException {
        throw new UnsupportedOperationException(); 
    }
    
    default KeyValueIterator<Windowed<K>, V> backwardAll() {
        throw new UnsupportedOperationException(); 
    }
    
    default KeyValueIterator<Windowed<K>, V> backwardFetchAll(Instant from, Instant to) throws IllegalArgumentException {
        throw new UnsupportedOperationException(); 
    }
}

public interface SessionStore<KReadOnlySessionStore<K, AGG> {
    // Moving read functions from SessionStore to ReadOnlySessionStore
    KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);

    KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime);

    AGG fetchSession(final K key, final long startTime, final long endTime);

    // New 
    default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
        throw new UnsupportedOperationException(); 
    }

    default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
        throw new UnsupportedOperationException(); 
    }
}

public interface ReadOnlySessionStore<K, AGG> {
    default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K key) {
        throw new UnsupportedOperationException(); 
    }
    default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K from, final K to) {
        throw new UnsupportedOperationException(); 
    }
}

...