Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussion"Adopted

Discussion thread: here

Vote Discussion thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9929

...

Internally, both implementations: persistent (RocksDB), and in-memory (TreeMap) support reverse/descending iteration:




Code Block
final RocksIterator iter = db.newIterator();
iter.seekToFirst();
iter.next();
final RocksIterator reverse = db.newIterator();
reverse.seekToLast();
reverse.prev();

...


final 

...

TreeMap<String, 

...

String> map = new 

...

TreeMap<>();
final 

...

NavigableSet<String> nav = map.navigableKeySet();
final 

...

NavigableSet<String> rev = map.descendingKeySet();

...



Reference issues

...

There are 2 important ranges in Kafka Streams Stores:

  • Key Range
  • Time Range


Info

For SessionStore/ReadOnlySessionStore: findSessions and findSession operations will be moved from SessionStore to ReadOnlySessionStore to align with how other stores are design.

Reverse Key Ranges

Extend existing interface for reverse KeyValueStore

...

Code Block
public interface ReadOnlyWindowStore<K, V> {
    default WindowStoreIterator<V> backwardFetch(K key, Instant from, Instant to) throws IllegalArgumentException {
        throw new UnsupportedOperationException(); 
    }

    default KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to, Instant fromTime, Instant toTime) throws IllegalArgumentException {
        throw new UnsupportedOperationException(); 
    }
    
    default KeyValueIterator<Windowed<K>, V> backwardAll() {
        throw new UnsupportedOperationException(); 
    }
    
    default KeyValueIterator<Windowed<K>, V> backwardFetchAll(Instant from, Instant to) throws IllegalArgumentException {
        throw new UnsupportedOperationException(); 
    }
}

public interface SessionStore<KReadOnlySessionStore<K, AGG> {
    // Moving read functions from SessionStore to ReadOnlySessionStore
    default KeyValueIterator<Windowed<K>, AGG> backwardFindSessionsfindSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
        throw new UnsupportedOperationException("Moved from SessionStore"); 
    }

    default KeyValueIterator<Windowed<K>, AGG> backwardFindSessionsfindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
        throw new UnsupportedOperationException("Moved from SessionStore"); 
    }
}

public interface ReadOnlySessionStore<K, AGG> {
    default AGG fetchSession(final K key, final long startTime, final long endTime) {
        throw new UnsupportedOperationException("Moved from SessionStore"); 
    }

    // New 
    default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
        throw new UnsupportedOperationException(); 
    }

    default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
        throw new UnsupportedOperationException(); 
    }

    default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K key) {
        throw new UnsupportedOperationException(); 
    }
    default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K from, final K to) {
        throw new UnsupportedOperationException(); 
    }
}

...