Table of Contents |
---|
Status
Current state: "Under Discussion"Adopted
Discussion thread: here
Vote Discussion thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
Internally, both implementations: persistent (RocksDB), and in-memory (TreeMap) support reverse/descending iteration:
Code Block |
---|
final RocksIterator iter = db.newIterator(); iter.seekToFirst(); iter.next(); final RocksIterator reverse = db.newIterator(); reverse.seekToLast(); reverse.prev(); |
...
final |
...
TreeMap<String, |
...
String> map = new |
...
TreeMap<>(); final |
...
NavigableSet<String> nav = map.navigableKeySet(); final |
...
NavigableSet<String> rev = map.descendingKeySet(); |
...
Reference issues
...
There are 2 important ranges in Kafka Streams Stores:
- Key Range
- Time Range
Info |
---|
For SessionStore/ReadOnlySessionStore: findSessions and findSession operations will be moved from SessionStore to ReadOnlySessionStore to align with how other stores are design. |
Reverse Key Ranges
Add a new Extend existing interface for reverse KeyValueStore
Code Block |
---|
public interface |
...
ReadOnlyKeyValueStore<K, |
...
V> { |
...
default KeyValueIterator<K, |
...
V> reverseRange(K from, K to) |
...
{ |
...
throw new UnsupportedOperationException(); } |
...
And create a new interface on top of KeyValueStore
:
...
|
...
|
...
|
...
|
...
default KeyValueIterator<K, V> reverseAll() {
throw new UnsupportedOperationException();
}
} |
Backward Time Ranges
Window and Session stores are based on a set of KeyValue Stores (Segments) organized by a time-based index. Therefore, for these stores time-range is more important than key-range to lookup for values.
A new interfaces Existing stores will be added for each Store type, including extended with backward methods:
Code Block |
---|
public interface |
...
ReadOnlyWindowStore<K, |
...
V> { |
...
default |
...
WindowStoreIterator<V> backwardFetch(K key, |
...
Instant |
...
from, Instant to) throws IllegalArgumentException { throw new UnsupportedOperationException(); |
...
} default KeyValueIterator<Windowed<K>, V> backwardFetch(K |
...
from, K to, Instant |
...
fromTime, Instant |
...
toTime) throws IllegalArgumentException { throw new UnsupportedOperationException(); } |
...
...
default KeyValueIterator<Windowed<K>, |
...
V> |
...
backwardAll() { throw new UnsupportedOperationException(); |
...
} default KeyValueIterator<Windowed<K>, V> backwardFetchAll(Instant from, Instant |
...
to) throws IllegalArgumentException |
...
{ |
...
throw new UnsupportedOperationException(); |
...
} } public interface ReadOnlySessionStore<K, AGG> { // Moving read functions from SessionStore to ReadOnlySessionStore default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { throw new UnsupportedOperationException("Moved from SessionStore"); } default KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { throw new UnsupportedOperationException("Moved from SessionStore"); } default AGG fetchSession(final K key, final long startTime, final long endTime) { throw new UnsupportedOperationException("Moved from SessionStore"); } // New default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { throw new UnsupportedOperationException(); } |
...
default KeyValueIterator<Windowed<K>, |
...
AGG> backwardFindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { throw new UnsupportedOperationException(); |
...
}
default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K key) {
throw new UnsupportedOperationException();
}
default KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K from, final K to) {
throw new UnsupportedOperationException();
}
}
|
...
And both SessionStore
and WindowStore
will be extended with Interfaces on top of them:
public interface WindowStoreWithBackwardInteration<K, V> extends WindowStore<K, V>, ReadOnlyBackwardWindowStore<K, V> {
// ...
}
public interface SessionStoreWithBackwardIteration<K, AGG> extends SessionStore<K, AGG>, ReadOnlyBackwardSessionStore<K, AGG> {
//...
}
Class hierarchy will change from:
To:
Compatibility, Deprecation, and Migration Plan
As new top interfaces will be implemented by internal components (e.g. RocksDB, InMemory, etc). supporting old one already, current users won't be affected.
New users will be able to choose between current APIs (forward-only) or proposed ones (forward+backward).
...
New methods will have default implementations to avoid affecting current implementations.
Rejected Alternatives
Create a parallel hierarchy of interfaces for backward operation. Even though this option seems like the best way to extend functionality, it was proved to not work in practice in KIP-614 discussion as interfaces get wrapped in different layers (Metered, Caching, Logging) so all the current hierarchy to create stores with Kafka Streams DSL will have to be duplicated.Initially it was considered to have additional parameter on all readOnlyStore methods e.g. Store#fetch(keyFrom, keyTo, timeFrom, timeTo, ReadDirection.FORWARD|BACKWARD), but has been declines as passing arguments in inverse is more intuitive. As this could cause unexpected effects in future versions, a flag has been added to overcome this.- Implicit ordering by flipping
from
andto
variables has been discouraged in favor of a more explicit approach based on new interfaces that make explicit the availability of reverse and backward fetch operations.