(Working in Progress)
...
Because of that, we should throw different exceptions for each type.
Proposed Changes
There are add four new exceptions:
...
- KafkaStreams
- ReadOnlyKeyValueStore(CompositeReadOnlyKeyValueStore)
- get(k)
- range(from, to)
- all()
- approximateNumEntries()
- ReadOnlySessionStore(CompositeReadOnlySessionStore)
- ReadOnlyWindowStore(CompositeReadOnlyWindowStore)
- fetch(k, rf, tt)
- fetch(from, to, rf, tt)
- all()
- fetchAll()
- KeyValueIterator(DelegatingPeekingKeyValueIterator)
- next()
- hasNext()
- peekNextKey()
During user call one of above methods, we should check KafkaStreams state by the following rule when InvalidStateStoreException is thrown:
- If state is RUNNING or REBALANCING
- StateStoreClosedException: should be wrapped to StateStoreRetriableException
- StateStoreMigratedException: should not be wrapped, directly thrown
- if state is PENDING_SHUTDOW or ERROR or NOT_RUNNING
- wrap InvalidStateStoreException to StateStoreFailException
Call Trace
Expand |
---|
title | Call trace 1: KafkaStreams#store() |
---|
|
- KafkaStreams#store() (v)
- QueryableStoreProvider#getStore() (v)
- GlobalStateStoreProvider#stores() (v)
- StreamThreadStateStroeProvider#stores() (v)
|
Expand |
---|
title | Call trace 2: ReadOnlyKeyValueStore#get() |
---|
|
- CompositeReadOnlyKeyValueStore#get() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredKeyValueBytesStore#get()
- InnerMeteredKeyValueStore#get()
- CachingKeyValueStore#get()
- AbstractStateStore#validateStoreOpen() (v)
- CachingKeyValueStore#getInternal()
- ChangeLoggingKeyValueBytesStore#get()
- RocksDBStore#get()
- RocksDBStore#validateStoreOpen() (v)
- RocksDBStore#getInternal()
|
Expand |
---|
title | Call trace 3: ReadOnlyKeyValueStore#range() |
---|
|
- CompositeReadOnlyKeyValueStore#range() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator<>()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply() (v)
- MeteredKeyValueBytesStore#range()
- InnerMeteredKeyValueStore#range()
- CachingKeyValueStore#range()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingKeyValueBytesStore#range()
- RocksDBStore#range()
- RocksDBStore#validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return new MergedSortedCacheKeyValueBytesStoreIterator()
- return new MeteredKeyValueIterator()
- return
- return
- return
- CompositKeyValueIterator#next()
- MeteredKeyValueIterator#next()
- MergedSortedCacheKeyValueBytesStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()
- RocksDBRangeIterator#hasNext()
RocksDBIterator#hasNext() (v) - AbstractMergedStortedCacheStoreIterator#nextSrtoreValue()
- RocksDBRangeIterator#next()
RocksDBIterator#next()- RocksDbIterator#hasNext() (v)
- RocksDbIterator#getKeyValue()
- RocksIterator#next()
- return keyvalue entry
- return
- return
- return outerkeyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
|
- WindowStoreIterator(MeteredWindowStoreIterator)
- next()
- hasNext()
- peekNextKey()
Code Block |
---|
|
public class KafkaStreams {
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType);
}
public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueStore<K, V> {
public V get(final K key);
public KeyValueIterator<K, V> range(final K from, final K to);
public KeyValueIterator<K, V> all();
public long approximateNumEntries();
}
public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore<K, V> {
public KeyValueIterator<Windowed<K>, V> fetch(final K key);
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to);
}
public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K, V> {
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo);
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo);
public KeyValueIterator<Windowed<K>, V> all();
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo);
}
class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> {
public synchronized boolean hasNext();
public synchronized KeyValue<K, V> next();
public KeyValue<K, V> peekNext();
}
class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
public boolean hasNext();
public KeyValue<Long, V> next();
public Long peekNextKey()
} |
During user call one of above methods, we should check KafkaStreams state by the following rule when InvalidStateStoreException is thrown:
- If state is RUNNING or REBALANCING:
- StateStoreClosedException: should be wrapped to StateStoreRetriableException
- StateStoreMigratedException: should not be wrapped, directly thrown
- if state is PENDING_SHUTDOW or ERROR or NOT_RUNNING:
- wrap InvalidStateStoreException to StateStoreFailException
Call Trace
Expand |
---|
title | Call trace 1: KafkaStreams#store() |
---|
|
- KafkaStreams#store() (v)
- QueryableStoreProvider#getStore() (v)
- GlobalStateStoreProvider#stores() (v)
- StreamThreadStateStroeProvider#stores() (v)
|
Expand |
---|
title | Call trace 2: ReadOnlyKeyValueStore#get() |
---|
|
- CompositeReadOnlyKeyValueStore#get() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredKeyValueBytesStore#get()
- InnerMeteredKeyValueStore#get()
- CachingKeyValueStore#get()
- AbstractStateStore#validateStoreOpen() (v)
- CachingKeyValueStore#getInternal()
- ChangeLoggingKeyValueBytesStore#get()
- RocksDBStore#get()
- RocksDBStore#validateStoreOpen() (v)
- RocksDBStore#getInternal()
|
Expand |
---|
title | Call trace 3: ReadOnlyKeyValueStore#range() |
---|
|
- CompositeReadOnlyKeyValueStore#range() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator<>()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply() (v)
- MeteredKeyValueBytesStore#range()
- InnerMeteredKeyValueStore#range()
- CachingKeyValueStore#range()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingKeyValueBytesStore#range()
- RocksDBStore#range()
- RocksDBStore#validateStoreOpen
|
Expand |
---|
title | Call trace 4: ReadOnlyKeyValueStore#all() |
---|
|
- CompositeReadOnlyKeyValueStore#all() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores
DelegatingPeekingKeyValueIterator<> DelegatingPeekingKeyValueIterator#hasNext- return new MergedSortedCacheKeyValueBytesStoreIterator()
- return new MeteredKeyValueIterator(
v- CompositKeyValueIterator#nextCompositKeyValueIterator#hasNext()
- NextIteratorFunction#applyMeteredKeyValueIterator#next()
- MergedSortedCacheKeyValueBytesStoreIterator#next(
v- )
MeteredKeyValueBytesStore#allAbstractMergedSortedCacheStoreIterator#next()InnerMeteredKeyValueStore#all
- RocksDBRangeIterator#hasNext()
CachingKeyValueStore#all- RocksDBIterator#hasNext() (v)
AbstractStateStore#validateStoreOpen- AbstractMergedStortedCacheStoreIterator#nextSrtoreValue(
vChangeLoggingKeyValueBytesStore#all- RocksDBRangeIterator#next()
RocksDBStore#allRocksDBStore#validateStoreOpen- RocksDbIterator#hasNext() (v)
return new RocksDBIterator- RocksDbIterator#getKeyValue()
- RocksIterator#next()
- return keyvalue entry
- return
new MergedSortedCacheKeyValueBytesStoreIterator() new MeteredKeyValueIterator()- CompositKeyValueIterator#nextDelegatingPeekingKeyValueIterator#next()MeteredKeyValueIterator#next()
MergedSortedCacheKeyValueBytesStoreIterator#next
|
Expand |
---|
title | Call trace 4: ReadOnlyKeyValueStore#all() |
---|
|
- AbstractMergedSortedCacheStoreIterator#nextCompositeReadOnlyKeyValueStore#all() MemoryLRUCacheBytesIterator.hasNext(v)
- DelegatingPeekingKeyValueIterator.hasNextWrappingStoreProvider#stores() (v)
- AbstractMergedSortedCacheStoreIterator#nextStoreValueStreamThreadStateStoreProvider#stores() (v)DelegatingPeekingKeyValueIterator#next
- return new DelegatingPeekingKeyValueIterator<>()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- return
- return
- return outerkeyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
|
Expand |
---|
title | Call trace 5: ReadOnlyKeyValueStore#approximateNumEntries() |
---|
|
title | Call trace 6: ReadOnlySessionStore#fetch(key- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply() (v)
- MeteredKeyValueBytesStore#all()
- InnerMeteredKeyValueStore#all()
CachingKeyValueStore#all()- AbstractStateStore#validateStoreOpen
CompositeReadOnlyKeyValueStore#approximateNumEntries() (v)- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores
MeteredKeyValueBytesStore#approximateNumEntries- ChangeLoggingKeyValueBytesStore#all()
InnerMeteredKeyValueStore#approximateNumEntries- CachingKeyValueStore#approximateNumEntries
- AbstractStateStore#validateStoreOpen
- RocksDBStore#validateStoreOpen()
- ) (
- RocksDBStore#isOpen
- return new RocksDBIterator()
RocksDBStore#approximateNumEntries- return new MergedSortedCacheKeyValueBytesStoreIterator()
- RocksDBStore#validateStoreOpenreturn new MeteredKeyValueIterator() (v)
- return value
- return
- return
- return
- return total
|
---|
Expand |
---|
- CompositKeyValueIterator#next()
CompositeReadOnlySessionStore#fetch- MeteredKeyValueIterator#next(
key- )
- MergedSortedCacheKeyValueBytesStoreIterator#next(
v- WrappingStoreProvider#stores
- AbstractMergedSortedCacheStoreIterator#next()
- MemoryLRUCacheBytesIterator.hasNext(
v- StreamThreadStateStoreProvider#stores
- DelegatingPeekingKeyValueIterator.hasNext() (v)
MeteredSessionStore#fetch- AbstractMergedSortedCacheStoreIterator#nextStoreValue(
key- MeteredSessionStore#findSessions
- DelegatingPeekingKeyValueIterator#next()
CachingSessionStore#findSessions- AbstractStateStore#validateStoreOpen()
- DelegatingPeekingKeyValueIterator#hasNext()
- ChangeLoggingSessionBytesStore#findSessions()RocksDBSessionStore.findSessions(k)RocksDBSessionStore.findSessions(from, to)RocksDBSegmentedBytesStore#fetch()SessionKeySchema#segmentsToSearch()Segments#segments
- return
- return outerkeyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
|
Expand |
---|
title | Call trace 5: ReadOnlyKeyValueStore#approximateNumEntries() |
---|
|
- CompositeReadOnlyKeyValueStore#approximateNumEntries() (v)
- Segments#getSegmentWrappingStoreProvider#stores() ConcurrentHashMap#get(v)
- Segments#isSegmentStreamThreadStateStoreProvider#stores()
- return segment
- retiurn segments
- return
- return new SegmentIterator()
- return new WrappedSessionStoreIterator()
- return
- return
- return new MergedSortedCacheSessionStoreIterator()
- return new MeteredWindowedKeyValueIterator()
- return
- (v)
- MeteredKeyValueBytesStore#approximateNumEntries()
- InnerMeteredKeyValueStore#approximateNumEntries()
- CachingKeyValueStore#approximateNumEntries()
- AbstractStateStore#validateStoreOpen() (v)
- RocksDBStore#approximateNumEntries()
- RocksDBStore#validateStoreOpen() (v)
- return value
MeteredWindowedKeyValueIterator#hasNext()- MergedSortedCacheSessionStoreIterator#hasNext()
AbstraceMergedSortedCacheStoreIterator#hasNext()- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to) - RocksDBStore.validateStoreOpen() (v)
return new RocksDBRangeIterator()
- return
- return
- return
return iterator(MeteredWindowedKeyValueIteratortotal |
Expand |
---|
title | Call trace 6: ReadOnlySessionStore#fetch(key) |
---|
|
- MeteredWindowedKeyValueIterator#nextCompositeReadOnlySessionStore#fetch(key) MergedSortedCacheSessionStoreIterator#next (v)
- AbstractMergedSortedCacheStoreIterator#nextWrappingStoreProvider#stores() AbstractMergedSortedCacheStoreIterator#hasNext (v)
- FilteredCacheIterator#hasNextStreamThreadStateStoreProvider#stores() WrappedSessionStoreIterator#hasNext (v)SegmentIterator#hasNext
- MeteredSessionStore#fetch(key)
- Segment.range(from, toMeteredSessionStore#findSessions()
- RocksDBStore.range(from, toCachingSessionStore#findSessions()
- RocksDBStore.AbstractStateStore#validateStoreOpen() (v)
- return new RocksDBRangeIteratorChangeLoggingSessionBytesStore#findSessions()
- return
- return
MergedSortedCacheSessionStoreIterator#- RocksDBSessionStore.findSessions(k)
- RocksDBSessionStore.findSessions(from, to)
- RocksDBSegmentedBytesStore#fetch
nextStoreValue() AbstractMergedSortedCacheStoreIterator#nextStoreValue- WrappedSessionStoreIterator#next
- SessionKeySchema#segmentsToSearch()
SegmentIterator#next- RocksDBRangeIterator#next
- RocksDbIterator#next
- RocksDbIterator#getKeyValueRocksIterator#next
- Segments#isSegment()
- return
entry- return new SegmentIterator()
- return new WrappedSessionStoreIterator()
- return
- return
|
Expand |
---|
title | Call trace 7: ReadOnlySessionStore#fetch(from, to) |
---|
|
- return new MergedSortedCacheSessionStoreIterator()
- return new MeteredWindowedKeyValueIterator()
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheSessionStoreIterator#hasNext()
AbstraceMergedSortedCacheStoreIterator#hasNext()- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)- RocksDBStore.validateStoreOpen
CompositeReadOnlySessionStore#fetch(from, to) (v)- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores
DelegatingPeekingKeyValueIterator<>- return iterator(MeteredWindowedKeyValueIterator)
- MeteredWindowedKeyValueIterator#next()
- MergedSortedCacheSessionStoreIterator#next
DelegatingPeekingKeyValueIterator#hasNext() (v)- CompositKeyValueIterator#hasNext()
NextIteratorFunction#applyAbstractMergedSortedCacheStoreIterator#next(store)- MeteredSessionStore#fetchAbstractMergedSortedCacheStoreIterator#hasNext(from, to)MeteredSessionStore#findSession
- (from, to, 0, tFilteredCacheIterator#hasNext()CachingSessionStore#findSession(from,
- to, 0, tWrappedSessionStoreIterator#hasNext()
- CachingSessionStore#validStoreOpenSegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIteratorAbstractStateStore#validStoreOpen() (v)
ChangeLoggingSessionBytesStore#isOpen()RocksDBSessionStore#isOpen() AbstractStateStore#isOpen() RocksDBSegmentedBytesStore#isOpen()
- return
- return
- return
ChangeLoggingSessionBytesStore#findSesisons
- MergedSortedCacheSessionStoreIterator#nextStoreValue()
RocksDBSessionStore#findSessionsAbstractMergedSortedCacheStoreIterator#nextStoreValue()
- RocksDBSegmentedBytesStore#fetchWrappedSessionStoreIterator#next()
- SessionKeySchema#segmentsToSearchSegmentIterator#next()
- Segments#segmentsRocksDBRangeIterator#next()
RocksDbIterator#next(v)- RocksDBStore#isOpenRocksDbIterator#getKeyValue()
- return new SegmentIteratorRocksIterator#next()
- return new WrappedSessionStoreIterator()entry
- return
- return
- return
- return new MergedSortedCacheSessionStoreIterator()
- return MeteredWindowedKeyValueIterator()
- return
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheSessionStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()
- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)
RocksDBStore.validateStoreOpen
|
Expand |
---|
title | Call trace 7: ReadOnlySessionStore#fetch(from, to) |
---|
|
- CompositeReadOnlySessionStore#fetch(from, to) (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator<>()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
return new RocksDBRangeIterator- CompositKeyValueIterator#hasNext()
- return
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- return
DelegatingPeekingKeyValueIterator#next- NextIteratorFunction#apply(store)
- MeteredSessionStore#fetch(from, to)
- MeteredSessionStore#findSession(from, to, 0, t)
- CachingSessionStore#findSession(from, to, 0, t)
- CachingSessionStore#validStoreOpen()
AbstractStateStore#validStoreOpen() (v)
- ChangeLoggingSessionBytesStore#isOpen()
- RocksDBSessionStore#isOpen()
AbstractStateStore#isOpen()
- RocksDBSegmentedBytesStore#isOpen()
- return
keyvalue
|
...
Changed Classes
...
...
- ChangeLoggingSessionBytesStore#findSesisons()
- RocksDBSessionStore#findSessions()
- RocksDBSegmentedBytesStore#fetch()
- SessionKeySchema#segmentsToSearch()
- Segments#segments() (v)
- return new SegmentIterator()
- return new WrappedSessionStoreIterator()
- return
- return
- return new MergedSortedCacheSessionStoreIterator()
- return MeteredWindowedKeyValueIterator()
- return
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheSessionStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()
- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)
- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- return
- DelegatingPeekingKeyValueIterator#next()
|
Expand |
---|
title | Call trace 8: ReadOnlyWindowStore#fetch |
---|
|
- CompositeReadOnlySessionStore#fetch(key) (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredWindowStore#fetch()
- CachingWindowStore#fetch(k, tf, tt)
- AbstractStateStore#validateStoreOpen()
- ChangeLoggingWindowBytesStore#isOpen()
- AbstractStateStore#isOpen()
- RocksDBWindowBytesStore#isOpen()
- RocksDBSegmentedBytesStore#isOpen()
- return
- return
- return
- return
- ChangeLoggingWindowBytesStore#fetch()
- RocksDBWindowStoreBytesStore#fetch()
- RocksDBSegmentedBytesStore#fetch()
- WindowKeySchema#segmentsToSearch()
- Segments#segments()
- RocksDBStore#isOpen()
- return segments
- return
- return new SegmentIterator()
- WindowStoreIteratorWrapper.bytesIterator()
- return new WrappedWindowStoreBytesIterator()
- return
- return
- ThreadCache#range()
- return new MemoryLRUCacheBytesIterator
- return new MergedSortedCacheWindowStoreIterator
- return new MeteredWindowStoreIterator
- MeteredWindowStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()- WrappedWindowStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment#range()
RocksDBStore#range()- RocksDBStore#validateStoreOpen()
- return RocksDBRangeIterator
- return
- return
- return
- return
- MeteredWindowStoreIterator#hasNext()
- MeteredSortedCacheWindowStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()- WrappedWindowStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment#range()
RocksDBStore#range()- RocksDBStore#validateStoreOpen() (v)
- return RocksDBRangeIterator
- return
- return
- return
- return
- MeteredWindowStoreIterator#next()
- MergedSortedCacheWindowStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()- WrappedWindowStoreBytesIterator#hasNext()
- SegmentIterator#hasNext()
- return
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedWindowStoreBytesIterator#next()
- SegmentIterator#next()
- SegmentIterator#hasNext()
- RocksDBRangeIterator#next()
- RocksDBRangeIterator#hasNext()
- RocksDBIterator#hasNext() (v)
- return
- RocksIterator#next()
- return entry
- return
- return keyvalue
- return
- return
- return keyvalue
|
Changes in call trace
Code Block |
---|
|
public interface QueryableStoreType<T> {
// TODO: pass stream instance parameter
T create(final KafkaStreams streams, final StateStoreProvider storeProvider, final String storeName);
} |
...