Status
Current state: Under Discussion Accepted(vote)
Discussion thread: here
JIRA:
Jira |
---|
server | ASF JIRA |
---|
columns | key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | KAFKA-5876 |
---|
|
...
Because of that, we should throw different exceptions for each type.
Proposed Changes
The To distinguish different types of error, we need to handle all InvalidStateStoreException better during these public methods invoked. The main change is to introduce new exceptions that extend from InvalidStateStoreException. InvalidStateStoreException is not thrown at all anymore, but only new sub-classes.
Code Block |
---|
|
# Throw to user exception - retryable
public class StreamThreadNotStartedExceptionStreamsNotStartedException extends InvalidStateStoreException
public class StreamThreadRebalancingExceptionStreamsRebalancingException extends InvalidStateStoreException
public class StateStoreMigratedException extends InvalidStateStoreException
# Throw to user exception - not retryable
public class StreamThreadNotRunningExceptionStateStoreNotAvailableException extends InvalidStateStoreException
public class StateStoreFailedExceptionUnknownStateStoreException extends InvalidStateStoreException
# Internal exception
public class StateStoreIsEmptyExceptionInvalidStateStorePartitionException extends InvalidStateStoreException
public class StateStoreClosedException extends InvalidStateStoreException |
- StreamsNotStartedException: will be thrown when stream thread state is CREATED
- StateStoreClosedException: There will be throw StateStoreClosedException when state store is not open. This is an internal exception and will be wrapped to StateStoreMigratedException or StateStoreFailedException later.
- StateStoreIsEmptyException: will be thrown when state store cannot be found in all StateStoreProviders. This is an internal exception, and will be wrapped to StateStoreMigratedException or StateStoreFailedException later.
- StreamThreadNotRunningException: will be thrown when stream thread is not running and stream state is PENDING_SHUTDOWN /
NOT_RUNNING / ERROR. The user cannot retry when this exception is thrown. - StreamThreadNotStartedException: will be thrown when stream thread is not running and stream state is CREATED, the user can retry until to RUNNING.
- StreamThreadRebalancingExceptionStreamsRebalancingException: will be thrown when stream thread is not running and stream state is REBALANCINGis REBALANCING, the user just retry and wait until rebalance finished (RUNNING).
- StateStoreMigratedException: A wrapper for StateStoreClosedException / StateStoreIsEmptyException. Will will be thrown when state store already closed and stream thread state is RUNNING/REBALANCING. The The user need to rediscover the state store.the store and cannot blindly retry as the store handle is invalid and a new store handle must be retrived.
- StateStoreNotAvailableException: will be thrown when state store closed and stream state is PENDING_SHUTDOWN / NOTStateStoreFailedException: A wrapper for StateStoreClosedException/StateStoreIsEmptyException. Will be thrown when stream thread is PENDING_SHUTDOWN / NOT_RUNNING / ERROR. The The user cannot retry when this exception is thrown.
The interface QueryableStoreType will append a new method, this can be assign KafkaStreams instance to all ComposeReadOnlyXXXXStore instance.
Code Block |
---|
|
public interface QueryableStoreType<T> {
void setStreams(final KafkaStreams kafkaStreams);
} |
- UnknownStateStoreException: will be thrown when passing an unknown state store. The user cannot retry when this exception is thrown.
- InvalidStateStorePartitionException: will be thrown when user requested partition is not available on the stream instance.
The following is the public methods that users will call to get state store instance:
- KafkaStreams
- @Deprecated store(storeName, queryableStoreType)
- store(storeQureyParams)
Info |
---|
All the above methods could be throw exceptions: StreamsNotStartedException, StreamsRebalancingException, StateStoreMigratedException, StateStoreNotAvailableException, UnknownStateStoreException, InvalidStateStorePartitionException |
...
The following is the public method methods that users will call to get store values:
- KafkaStreams
- interface ReadOnlyKeyValueStore(class CompositeReadOnlyKeyValueStore)
- get(kkey)
- range(from, to)
- all()
- approximateNumEntries()
- ReadOnlySessionStoreinterface ReadOnlySessionStore(CompositeReadOnlySessionStoreclass CompositeReadOnlySessionStore)
- fetch(kkey)
- fetch(from, to)
- interface ReadOnlyWindowStore(class CompositeReadOnlyWindowStore)
ReadOnlyWindowStoreCompositeReadOnlyWindowStore- key, time)
fetch(
kkey,
rffrom,
ttto)
- fetch(from, to, rffromTime, tttoTime)
- all()
- fetchAll()
- KeyValueIterator(DelegatingPeekingKeyValueIterator)
- next()
- hasNext()
- peekNextKey()
- WindowStoreIterator(MeteredWindowStoreIterator)
- next()
- hasNext()
- peekNextKey()
...
...
...
...
- to)
- @Deprecated fetch(key, timeFrom, timeTo)
- @Deprecated fetch(from, to, timeFrom, timeTo)
- @Deprecated fetchAll(timeFrom, timeTo)
- interface KeyValueIterator(class DelegatingPeekingKeyValueIterator)
- next()
- hasNext()
- peekNextKey()
Info |
---|
All the above methods could be throw following exceptions: StreamsRebalancingException, StateStoreMigratedException, StateStoreNotAvailableException, InvalidStateStorePartitionException
|
Call Trace
Expand |
---|
title | Call trace 1: KafkaStreams#store() |
---|
|
- KafkaStreams#store() (v)
- QueryableStoreProvider#getStore() (v)
- GlobalStateStoreProvider#stores() (v)
- StreamThreadStateStroeProvider#stores() (v)
|
Expand |
---|
title | Call trace 2: ReadOnlyKeyValueStore#get() |
---|
|
- CompositeReadOnlyKeyValueStore#get() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredKeyValueBytesStore#get()
- InnerMeteredKeyValueStore#get()
- CachingKeyValueStore#get()
- AbstractStateStore#validateStoreOpen() (v)
- CachingKeyValueStore#getInternal()
- ChangeLoggingKeyValueBytesStore#get()
- RocksDBStore#get()
- RocksDBStore#validateStoreOpen() (v)
- RocksDBStore#getInternal()
|
Expand |
---|
title | Call trace 3: ReadOnlyKeyValueStore#range() |
---|
|
- CompositeReadOnlyKeyValueStore#range() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply() (v)
- MeteredKeyValueBytesStore#range()
- InnerMeteredKeyValueStore#range()
- CachingKeyValueStore#range()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingKeyValueBytesStore#range()
- RocksDBStore#range()
- RocksDBStore#validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return new MergedSortedCacheKeyValueBytesStoreIterator()
- return new MeteredKeyValueIterator()
- return
- return
- return
- CompositKeyValueIterator#next()
- MeteredKeyValueIterator#next()
- MergedSortedCacheKeyValueBytesStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()
- RocksDBRangeIterator#hasNext()
RocksDBIterator#hasNext() (v) - AbstractMergedStortedCacheStoreIterator#nextSrtoreValue()
- RocksDBRangeIterator#next()
RocksDBIterator#next()- RocksDbIterator#hasNext() (v)
- RocksDbIterator#getKeyValue()
- RocksIterator#next()
- return keyvalue entry
- return
- return
- return outerkeyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
|
Expand |
---|
title | Call trace 4: ReadOnlyKeyValueStore#all() |
---|
|
- CompositeReadOnlyKeyValueStore#all() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply() (v)
- MeteredKeyValueBytesStore#all()
- InnerMeteredKeyValueStore#all()
CachingKeyValueStore#all()- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingKeyValueBytesStore#all()
- RocksDBStore#all()
- RocksDBStore#validateStoreOpen() (v)
- return new RocksDBIterator()
- return
- return new MergedSortedCacheKeyValueBytesStoreIterator()
- return new MeteredKeyValueIterator()
- return
- return
- return
- CompositKeyValueIterator#next()
- MeteredKeyValueIterator#next()
- MergedSortedCacheKeyValueBytesStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()- MemoryLRUCacheBytesIterator.hasNext()
- DelegatingPeekingKeyValueIterator.hasNext() (v)
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- DelegatingPeekingKeyValueIterator#next()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- return
- return
- return outerkeyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
|
Expand |
---|
title | Call trace 5: ReadOnlyKeyValueStore#approximateNumEntries() |
---|
|
- CompositeReadOnlyKeyValueStore#approximateNumEntries() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredKeyValueBytesStore#approximateNumEntries()
- InnerMeteredKeyValueStore#approximateNumEntries()
- CachingKeyValueStore#approximateNumEntries()
- AbstractStateStore#validateStoreOpen() (v)
- RocksDBStore#approximateNumEntries()
- RocksDBStore#validateStoreOpen() (v)
- return value
- return
- return
- return
- return total
|
...
title | Call trace 6: ReadOnlySessionStore#fetch(key) |
---|
- CompositeReadOnlySessionStore#fetch(key) (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredSessionStore#fetch(key)
- MeteredSessionStore#findSessions()
- CachingSessionStore#findSessions()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingSessionBytesStore#findSessions()
- RocksDBSessionStore.findSessions(k)
- RocksDBSessionStore.findSessions(from, to)
- RocksDBSegmentedBytesStore#fetch()
- SessionKeySchema#segmentsToSearch()
- Segments#segments() (v)
- Segments#getSegment()
- ConcurrentHashMap#get()
- Segments#isSegment()
- return segment
- retiurn segments
- return
- return new SegmentIterator()
- return new WrappedSessionStoreIterator()
- return
- return
- return new MergedSortedCacheSessionStoreIterator()
- return new MeteredWindowedKeyValueIterator()
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheSessionStoreIterator#hasNext()
AbstraceMergedSortedCacheStoreIterator#hasNext()- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return
- return
- return
- return iterator(MeteredWindowedKeyValueIterator)
- MeteredWindowedKeyValueIterator#next()
- MergedSortedCacheSessionStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()- AbstractMergedSortedCacheStoreIterator#hasNext()
- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return
- return
- MergedSortedCacheSessionStoreIterator#nextStoreValue()
AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedSessionStoreIterator#next()
- SegmentIterator#next()
- RocksDBRangeIterator#next()
RocksDbIterator#next()- RocksDbIterator#getKeyValue()
- RocksIterator#next()
- return entry
- return
- return
- return
- return
- return
...
Expand |
---|
title | Call trace 7: ReadOnlySessionStore#fetch(from, to) |
---|
|
- CompositeReadOnlySessionStore#fetch(from, to) (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator<>()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply(store)
- MeteredSessionStore#fetch(from, to)
- MeteredSessionStore#findSession(from, to, 0, t)
- CachingSessionStore#findSession(from, to, 0, t)
- CachingSessionStore#validStoreOpen()
AbstractStateStore#validStoreOpen() (v)
- ChangeLoggingSessionBytesStore#isOpen()
- RocksDBSessionStore#isOpen()
AbstractStateStore#isOpen()
- RocksDBSegmentedBytesStore#isOpen()
- return
- return
- return
- ChangeLoggingSessionBytesStore#findSesisons()
- RocksDBSessionStore#findSessions()
- RocksDBSegmentedBytesStore#fetch()
- SessionKeySchema#segmentsToSearch()
- Segments#segments() (v)
- return new SegmentIterator()
- return new WrappedSessionStoreIterator()
- return
- return
- return new MergedSortedCacheSessionStoreIterator()
- return MeteredWindowedKeyValueIterator()
- return
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheSessionStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()
- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)
- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- return
- DelegatingPeekingKeyValueIterator#next()
|
Expand |
---|
title | Call trace 8: ReadOnlyWindowStore#fetch(key) |
---|
|
- CompositeReadOnlySessionStore#fetch(key) (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredWindowStore#fetch()
- CachingWindowStore#fetch(k, tf, tt)
- AbstractStateStore#validateStoreOpen()
- ChangeLoggingWindowBytesStore#isOpen()
- AbstractStateStore#isOpen()
- RocksDBWindowBytesStore#isOpen()
- RocksDBSegmentedBytesStore#isOpen()
- return
- return
- return
- return
- ChangeLoggingWindowBytesStore#fetch()
- RocksDBWindowStoreBytesStore#fetch()
- RocksDBSegmentedBytesStore#fetch()
- WindowKeySchema#segmentsToSearch()
- Segments#segments()
- RocksDBStore#isOpen()
- return segments
- return
- return new SegmentIterator()
- WindowStoreIteratorWrapper.bytesIterator()
- return new WrappedWindowStoreBytesIterator()
- return
- return
- ThreadCache#range()
- return new MemoryLRUCacheBytesIterator
- return new MergedSortedCacheWindowStoreIterator
- return new MeteredWindowStoreIterator
- MeteredWindowStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()- WrappedWindowStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment#range()
RocksDBStore#range()- RocksDBStore#validateStoreOpen()
- return RocksDBRangeIterator
- return
- return
- return
- return
- MeteredWindowStoreIterator#hasNext()
- MeteredSortedCacheWindowStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()- WrappedWindowStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment#range()
RocksDBStore#range()- RocksDBStore#validateStoreOpen() (v)
- return RocksDBRangeIterator
- return
- return
- return
- return
- MeteredWindowStoreIterator#next()
- MergedSortedCacheWindowStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()- WrappedWindowStoreBytesIterator#hasNext()
- SegmentIterator#hasNext()
- return
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedWindowStoreBytesIterator#next()
- SegmentIterator#next()
- SegmentIterator#hasNext()
- RocksDBRangeIterator#next()
- RocksDBRangeIterator#hasNext()
- RocksDBIterator#hasNext() (v)
- return
- RocksIterator#next()
- return entry
- return
- return keyvalue
- return
- return
- return keyvalue
|
Expand |
---|
title | Call trace 9: ReadOnlyWindowStore#fetch(from, to) |
---|
|
- CompositeReadOnlyWindowStore#fetch(from, to)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return
- return new DelegatingPeekingKeyValueIterator()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositeKeyValueIterator#hasNext()
- NextIteratorFunciton#apply()
- MeteredWindowStore#fetch()
- CachingWindowStore#fetch()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingWindowBytesStore#fetch()
- RocksDBWindowBytesStore#fetch()
- RocksDBSegmentedBytesStore#fetch()
- WindowKeySchema#segmentsToSearch()
- Segments#segments() (v)
- return
- return SegmentIterator
- return keyvalueIterator
- return
- ThreadCache.range()
- return new MemoryLRUCacheBytesIterator
- WindowKeySchema#hasNextCondition()
- return new MergedSortedCacheWindowStoreKeyValueIterator()
- return MeteredWindowKeyValueIterator()
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheWindowStoreKeyValueIterator#hasNext()
AbstractMergedSortedCacheStoreIterator
- WrappedWindowStoreBytesIterator#hasNext()
WrappedKeyValueIterator#hasNext()
- SegementIterator#hasNext()
- Segment#range()
RocksDBStore#range()- RocksDBStore#validateStoreOpen() (v)
- return RocksDBRangeIterator
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- MeteredWindowedKeyValueIterator#next()
- MergedSortedCacheWindowStoreKeyValueIterator#next()
AbstractMergedSortedCacheStoreIterator#next()
- AbstractMergedSortedCacheStoreIterator#hasNext()
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedWindowStoreBytesIterator#next()
- SegmentIteator#hasNext()
- SegmentIterator#next()
- SegmentIterator#hasNext() (v)
- RocksDBRangeIterator#next()
RocksDbIterator#next()- RocksDbIterator#hasNext()
- RocksIterator#next()
- return entry
- return
- return keyvalue
- return
- return keyvalue
- return
- return
- return
- DelegatingPeekingKeyValueIterator#next()
- DelegatingPeekingKeyValueIterator#hasNext()
- return
|
Expand |
---|
title | Call trace 10: ReadOnlyWindowStore#all() |
---|
|
- CompositeReadOnlyWindowStore#all()
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return
- return new DelegatingPeekingKeyValueIterator()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositeKeyValueIterator#hasNext()
- NextIteratorFunciton#apply()
- MeteredWindowStore#all()
- CachingWindowStore#all()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingWindowBytesStore#all()
- RocksDBWindowBytesStore#all()
- RocksDBSegmentedBytesStore#all()
- Segments#allSegments() (v)
- return new SegmentIterator()
- return keyValueIterator
- return
- ThreadCache#all()
- return new MemoryLRUCacheBytesIterator
- return new MergedSortedCacheWindowStoreKeyValueIterator
- return new MeteredWindowedKeyValueIterator()
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheWindowStoreKeyValueIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()
- WrappedKeyValueIterator#hasNext()
- SegmentIterator#hasNext() (v)
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- MeteredWindowedKeyValueIterator#next()
- MergedSortedCacheWindowStoreKeyValueIterator#next()
AbstractMergedSortedCacheStoreIterator#next()
- WrappedKeyValueIterator#hasNext()
- SegmentIterator#hasNext()
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedKeyValueIterator#next()
- SegmentIterator#next()
- RocksDbIterator#next()
- RocksDbIterator#hasNext() (v)
- RocksIterator.next()
- return entry
- return
- return keyvalue
- return
- return
- return keyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
- DelegatingPeekingKeyValueIterator#hasNext()
- return
|
Expand |
---|
title | Call trace 11: ReadOnlyWindowStore#fetchAll() |
---|
|
- CompositeReadOnlyWindowStore#fetchAll()
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return
- return new DelegatingPeekingKeyValueIterator()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositeKeyValueIterator#hasNext()
- NextIteratorFunciton#apply()
- MeteredWindowStore#fetchAll()
- CachingWindowStore#fetchAll()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingWindowBytesStore#fetchAll()
- RocksDBWindowBytesStore#fetchAll()
- RocksDBSegmentedBytesStore#fetchAll()
- Segments#allSegments() (v)
- return new SegmentIterator()
- return keyValueIterator
- return
- ThreadCache#all()
- return new MemoryLRUCacheBytesIterator
- return new MergedSortedCacheWindowStoreKeyValueIterator
- return new MeteredWindowedKeyValueIterator()
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheWindowStoreKeyValueIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()
- WrappedKeyValueIterator#hasNext()
- SegmentIterator#hasNext() (v)
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- MeteredWindowedKeyValueIterator#next()
- MergedSortedCacheWindowStoreKeyValueIterator#next()
AbstractMergedSortedCacheStoreIterator#next()
- WrappedKeyValueIterator#hasNext()
- SegmentIterator#hasNext() (v)
- return
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedKeyValueIterator#next()
- SegmentIterator#next()
- RocksDbIterator#next()
- RocksDbIterator#hasNext() (v)
- RocksIterator.next()
- return entry
- return
- return keyvalue
- return
- return
- return keyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
- DelegatingPeekingKeyValueIterator#hasNext()
- return
|
Compatibility, Deprecation, and Migration Plan
...
Rejected Alternatives
None.
Text-to-speech function is limited to 200 characters
|