...
There append three new exceptions:
Code Block |
---|
|
// public class StateStoreClosedException extends InvalidStateStoreException
public class StateStoreMigratedException extends InvalidStateStoreException
public class StateStoreRetryableException extends InvalidStateStoreException
public class StateStoreFailException extends InvalidStateStoreException |
...
- KafkaStreams
- ReadOnlyKeyValueStore(CompositeReadOnlyKeyValueStore)
- get(k)
- range(from, to)
- all()
- approximateNumEntries()
- ReadOnlySessionStore(CompositeReadOnlySessionStore)
- ReadOnlyWindowStore(CompositeReadOnlyWindowStore)
- fetch(k, rf, tt)
- fetch(from, to, rf, tt)
- all()
- fetchAll()
- KeyValueIterator(DelegatingPeekingKeyValueIterator)
- next()
- hasNext()
- peekNextKey()
We need During user call one of above methods, we should check KafkaStreams status state by the following rule when any state store exception thrown during user call all of above methods InvalidStateStoreException is thrown:
- If state is RUNNING or REBALANCING
- StateStoreClosedException: should be wrapped to StateStoreRetriableException
- StateStoreMigratedException: should not be wrapped, directly thrownwrap InvalidStateStoreException to StateStoreRetriableException
- if state is PENDING_SHUTDOW or ERROR or NOT_RUNNING
- wrap InvalidStateStoreException to StateStoreFailException
...
Expand |
---|
title | Call trace 1: KafkaStreams#store() |
---|
|
- KafkaStreams#store() (v)
- QueryableStoreProvider#getStore() (v)
- GlobalStateStore#storesGlobalStateStoreProvider#stores() (v)
- StreamThreadStateStroeProvider#stores() (v)
|
...
Expand |
---|
title | Call trace 4: ReadOnlyKeyValueStore#all() |
---|
|
- CompositeReadOnlyKeyValueStore#all() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator<>()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply() (v)
- MeteredKeyValueBytesStore#all()
- InnerMeteredKeyValueStore#all()
CachingKeyValueStore#all()- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingKeyValueBytesStore#all()
- RocksDBStore#all()
- RocksDBStore#validateStoreOpen() (v)
- return new RocksDBIterator()
- return
- return new MergedSortedCacheKeyValueBytesStoreIterator()
- return new MeteredKeyValueIterator()
- return
- return
- return
- CompositKeyValueIterator#next()
- MeteredKeyValueIterator#next()
- MergedSortedCacheKeyValueBytesStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()- MemoryLRUCacheBytesIterator.hasNext()
- DelegatingPeekingKeyValueIterator.hasNext() (v)
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- DelegatingPeekingKeyValueIterator#next()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- return
- return
- return outerkeyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
|
...
Expand |
---|
title | Call trace 7: ReadOnlySessionStore#fetch(from, to) |
---|
|
- CompositeReadOnlySessionStore#fetch(from, to) (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator<>()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply(store)
- MeteredSessionStore#fetch(from, to)
- MeteredSessionStore#findSession(from, to, 0, t)
- CachingSessionStore#findSession(from, to, 0, t)
- CachingSessionStore#validStoreOpen()
AbstractStateStore#validStoreOpen() (v)
- ChangeLoggingSessionBytesStore#isOpen()
- RocksDBSessionStore#isOpen()
AbstractStateStore#isOpen()
- RocksDBSegmentedBytesStore#isOpen()
- return
- return
- return
- ChangeLoggingSessionBytesStore#findSesisons()
- RocksDBSessionStore#findSessions()
- RocksDBSegmentedBytesStore#fetch()
- SessionKeySchema#segmentsToSearch()
- Segments#segments() (v)
- return new SegmentIterator()
- return new WrappedSessionStoreIterator()
- return
- return
- return new MergedSortedCacheSessionStoreIterator()
- return MeteredWindowedKeyValueIterator()
- return
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheSessionStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()
- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)
- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- return
- DelegatingPeekingKeyValueIterator#next()
|
Changed Classes
Code Block |
---|
|
public interface QueryableStoreType<T> {
T create(final KafkaStreams streams, final StateStoreProvider storeProvider, final String storeName);
} |
Code Block |
---|
language | java |
---|
title | KafkaStreams#store() |
---|
collapse | true |
---|
|
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
validateIsRunning();
return queryableStoreProvider.getStore(storeName, queryableStoreType);
} |
...