...
Code Block | ||
---|---|---|
| ||
# Throw to user exception - retryable public class StateStoreMigratedExceptionStreamThreadNotStartedException extends InvalidStateStoreException public class StateStoreRetryableExceptionStreamThreadRebalancingException extends InvalidStateStoreException public class StateStoreFailExceptionStateStoreMigratedException extends InvalidStateStoreException # Internal exceptionThrow to user exception - not retryable public class StreamThreadNotRunningException extends InvalidStateStoreException public class StateStoreClosedExceptionStateStoreFailedException extends InvalidStateStoreException # Internal exception public class StateStoreEmptyExceptionStateStoreIsEmptyException extends InvalidStateStoreException public class StreamThreadNotRunningExceptionStateStoreClosedException extends InvalidStateStoreException |
Three categories exception throw to the user:
- StateStoreRetryableException: The application instance in the state of rebalancing, the user just need retry and wait until rebalance finished.
StateStoreMigratedException: The store got migrated and not hosted in application instance, the users need to rediscover the store.
- StateStoreFailException: Fatal error when access state store, the user cannot retry or rediscover.
Three internal exceptions: StateStoreClosedException, StateStoreEmptyException, StreamThreadNotRunningException
The purpose of the internal exceptions is to distinguish between the different kinds of InvalidStateStoreException. The internal exception will be wrapped as user's category exception finally.
For example, StreamThreadStateStoreProvider#stores() will throw StreamThreadNotRunningException(internal exception). And then the internal exception will be wrapped as StateStoreRetryableException or StateStoreFailException during the KafkaStreams.store() and throw to the user.
The following is the public method that users will call:
- StateStoreClosedException: There will be throw StateStoreClosedException when state store is not open. This is an internal exception and will be wrapped to StateStoreMigratedException or StateStoreFailedException later.
- StateStoreIsEmptyException: will be thrown when state store cannot be found in all StateStoreProviders. This is an internal exception, and will be wrapped to StateStoreMigratedException or StateStoreFailedException later.
- StreamThreadNotRunningException: will be thrown when stream thread is not running and stream state is PENDING_SHUTDOWN /
NOT_RUNNING / ERROR. The user cannot retry when this exception is thrown. - StreamThreadNotStartedException: will be thrown when stream thread is not running and stream state is CREATED, the user can retry until to RUNNING.
- StreamThreadRebalancingException: will be thrown when stream thread is not running and stream state is REBALANCING, the user just retry and wait until rebalance finished(RUNNING).
- StateStoreMigratedException: A wrapper for StateStoreClosedException / StateStoreIsEmptyException. Will be thrown when stream thread is RUNNING/REBALANCING. The user need to rediscover the state store.
- StateStoreFailedException: A wrapper for StateStoreClosedException/StateStoreIsEmptyException. Will be thrown when stream thread is PENDING_SHUTDOWN / NOT_RUNNING / ERROR. The user cannot retry when this exception is thrown.
The interface QueryableStoreType will append a new method, this can be assign KafkaStreams instance to all ComposeReadOnlyXXXXStore instance.
Code Block | ||
---|---|---|
| ||
public interface QueryableStoreType<T> {
void setStreams(final KafkaStreams kafkaStreams);
} |
The following is the public method that users will call:
- KafkaStreams
- store()
- ReadOnlyKeyValueStore(CompositeReadOnlyKeyValueStore)
- get(k)
- range(from, to)
- all()
- approximateNumEntries()
- ReadOnlySessionStore(CompositeReadOnlySessionStore)
- fetch(k)
- fetch(from, to)
- ReadOnlyWindowStore(CompositeReadOnlyWindowStore)
- fetch(
- KafkaStreams
- store()
- ReadOnlyKeyValueStore(CompositeReadOnlyKeyValueStore)
- get(k)
- range(from, to)
- all()
- approximateNumEntries()
- ReadOnlySessionStore(CompositeReadOnlySessionStore)
- fetch(k)
- fetch(from, to)
- ReadOnlyWindowStore(CompositeReadOnlyWindowStore)
- fetch(k, rf, tt)
- fetch(from, to, rf, tt)
- all()
- fetchAll()
- KeyValueIterator(DelegatingPeekingKeyValueIterator)
- next()
- hasNext()
- peekNextKey()
- WindowStoreIterator(MeteredWindowStoreIterator)
- next()
- hasNext()
- peekNextKey()
Code Block | ||
---|---|---|
| ||
public class KafkaStreams { public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType); } public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueStore<K, V> { public V get(final K key); public KeyValueIterator<K, V> range(final K from, final K to); public KeyValueIterator<K, V> all(); public long approximateNumEntries(); } public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore<K, V> { public KeyValueIterator<Windowed<K>, V> fetch(final K key); public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to); } public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K, V> { public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo); public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo); public KeyValueIterator<Windowed<K>, V> all(); public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo); } class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> { public synchronized boolean hasNext(); public synchronized KeyValue<K, V> next(); public KeyValue<K, V> peekNext(); } class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> { public boolean hasNext(); public KeyValue<Long, V> next(); public Long peekNextKey() } |
During user call one of above methods, we should check KafkaStreams state by the following rule when InvalidStateStoreException is thrown:
...
- StateStoreClosedException: should be wrapped to StateStoreRetryableException
- StreamThreadNotRunningException: should be wrapped to StateStoreRetryableException
- StateStoreEmptyException: should be wrapped to StateStoreMigratedException
...
Call Trace
Expand | ||
---|---|---|
| ||
|
...
Expand | ||
---|---|---|
| ||
|
...
Rejected Alternatives
None.