You are viewing an old version of this page. View the current version.
Compare with Current
View Page History
« Previous
Version 23
Next »
(Working in Progress)
Status
Current state: Under Discussion
Discussion thread: here
JIRA:
Unable to render Jira issues macro, execution error.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error. Because of that, we should throw different exceptions for each type.
Proposed Changes
There append three new exceptions:
// public class StateStoreClosedException extends InvalidStateStoreException
public class StateStoreMigratedException extends InvalidStateStoreException
public class StateStoreRetryableException extends InvalidStateStoreException
public class StateStoreFailException extends InvalidStateStoreException
Three categories exception throw to user
The following is the public method that users will call:
- KafkaStreams
- ReadOnlyKeyValueStore(CompositeReadOnlyKeyValueStore)
- get(k)
- range(from, to)
- all()
- approximateNumEntries()
- ReadOnlySessionStore(CompositeReadOnlySessionStore)
- ReadOnlyWindowStore(CompositeReadOnlyWindowStore)
- fetch(k, rf, tt)
- fetch(from, to, rf, tt)
- all()
- fetchAll()
- KeyValueIterator(DelegatingPeekingKeyValueIterator)
- next()
- hasNext()
- peekNextKey()
We need check KafkaStreams status by following rule when any state store exception thrown during user call all of above methods
- If state is RUNNING or REBALANCING
- wrap InvalidStateStoreException to StateStoreRetriableException
- if state is PENDING_SHUTDOW or ERROR or NOT_RUNNING
- wrap InvalidStateStoreException to StateStoreFailException
Call Trace
Call trace 1: KafkaStreams#store()
- KafkaStreams#store() (v)
- QueryableStoreProvider#getStore() (v)
- GlobalStateStore#stores() (v)
- StreamThreadStateStroeProvider#stores() (v)
Call trace 2: ReadOnlyKeyValueStore#get()
- CompositeReadOnlyKeyValueStore#get() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredKeyValueBytesStore#get()
- InnerMeteredKeyValueStore#get()
- CachingKeyValueStore#get()
- AbstractStateStore#validateStoreOpen() (v)
- CachingKeyValueStore#getInternal()
- ChangeLoggingKeyValueBytesStore#get()
- RocksDBStore#get()
- RocksDBStore#validateStoreOpen() (v)
- RocksDBStore#getInternal()
Call trace 3: ReadOnlyKeyValueStore#range()
- CompositeReadOnlyKeyValueStore#range() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator<>()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply() (v)
- MeteredKeyValueBytesStore#range()
- InnerMeteredKeyValueStore#range()
- CachingKeyValueStore#range()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingKeyValueBytesStore#range()
- RocksDBStore#range()
- RocksDBStore#validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return new MergedSortedCacheKeyValueBytesStoreIterator()
- return new MeteredKeyValueIterator()
- return
- return
- return
- CompositKeyValueIterator#next()
- MeteredKeyValueIterator#next()
- MergedSortedCacheKeyValueBytesStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()
- RocksDBRangeIterator#hasNext()
RocksDBIterator#hasNext() (v) - AbstractMergedStortedCacheStoreIterator#nextSrtoreValue()
- RocksDBRangeIterator#next()
RocksDBIterator#next()- RocksDbIterator#hasNext() (v)
- RocksDbIterator#getKeyValue()
- RocksIterator#next()
- return keyvalue entry
- return
- return
- return outerkeyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
Call trace 4: ReadOnlyKeyValueStore#all()
- CompositeReadOnlyKeyValueStore#all() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator<>()
- DelegatingPeekingKeyValueIterator#hasNext()
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply() (v)
- MeteredKeyValueBytesStore#all()
- InnerMeteredKeyValueStore#all()
CachingKeyValueStore#all()- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingKeyValueBytesStore#all()
- RocksDBStore#all()
- RocksDBStore#validateStoreOpen() (v)
- return new RocksDBIterator()
- return
- return new MergedSortedCacheKeyValueBytesStoreIterator()
- return new MeteredKeyValueIterator()
- return
- return
- return
- CompositKeyValueIterator#next()
- MeteredKeyValueIterator#next()
- MergedSortedCacheKeyValueBytesStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()- MemoryLRUCacheBytesIterator.hasNext()
- DelegatingPeekingKeyValueIterator.hasNext() (v)
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- DelegatingPeekingKeyValueIterator#next()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- return
- return
- return outerkeyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
Call trace 5: ReadOnlyKeyValueStore#approximateNumEntries()
- CompositeReadOnlyKeyValueStore#approximateNumEntries() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredKeyValueBytesStore#approximateNumEntries()
- InnerMeteredKeyValueStore#approximateNumEntries()
- CachingKeyValueStore#approximateNumEntries()
- AbstractStateStore#validateStoreOpen() (v)
- RocksDBStore#approximateNumEntries()
- RocksDBStore#validateStoreOpen() (v)
- return value
- return
- return
- return
- return total
Call trace 6: ReadOnlySessionStore#fetch(key)
- CompositeReadOnlySessionStore#fetch(key) (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredSessionStore#fetch(key)
- MeteredSessionStore#findSessions()
- CachingSessionStore#findSessions()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingSessionBytesStore#findSessions()
- RocksDBSessionStore.findSessions(k)
- RocksDBSessionStore.findSessions(from, to)
- RocksDBSegmentedBytesStore#fetch()
- SessionKeySchema#segmentsToSearch()
- Segments#segments()
- Segments#getSegment()
- ConcurrentHashMap#get()
- Segments#isSegment()
- return segment
- retiurn segments
- return
- return new SegmentIterator()
- return new WrappedSessionStoreIterator()
- return
- return
- return new MergedSortedCacheSessionStoreIterator()
- return new MeteredWindowedKeyValueIterator()
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheSessionStoreIterator#hasNext()
AbstraceMergedSortedCacheStoreIterator#hasNext()- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return
- return
- return
- return iterator(MeteredWindowedKeyValueIterator)
- MeteredWindowedKeyValueIterator#next()
- MergedSortedCacheSessionStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()- AbstractMergedSortedCacheStoreIterator#hasNext()
- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return
- return
- MergedSortedCacheSessionStoreIterator#nextStoreValue()
AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedSessionStoreIterator#next()
- SegmentIterator#next()
- RocksDBRangeIterator#next()
RocksDbIterator#next()- RocksDbIterator#getKeyValue()
- RocksIterator#next()
- return entry
- return
- return
- return
- return
- return
Call trace 7: ReadOnlySessionStore#fetch(from, to)
- CompositeReadOnlySessionStore#fetch(from, to) (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator<>()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply(store)
- MeteredSessionStore#fetch(from, to)
- MeteredSessionStore#findSession(from, to, 0, t)
- CachingSessionStore#findSession(from, to, 0, t)
- CachingSessionStore#validStoreOpen()
AbstractStateStore#validStoreOpen() (v)
- ChangeLoggingSessionBytesStore#isOpen()
- RocksDBSessionStore#isOpen()
AbstractStateStore#isOpen()
- RocksDBSegmentedBytesStore#isOpen()
- return
- return
- return
- ChangeLoggingSessionBytesStore#findSesisons()
- RocksDBSessionStore#findSessions()
- RocksDBSegmentedBytesStore#fetch()
- SessionKeySchema#segmentsToSearch()
- Segments#segments() (v)
- return new SegmentIterator()
- return new WrappedSessionStoreIterator()
- return
- return
- return new MergedSortedCacheSessionStoreIterator()
- return MeteredWindowedKeyValueIterator()
- return
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheSessionStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()
- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)
- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- return
- DelegatingPeekingKeyValueIterator#next()
Changed Classes
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
validateIsRunning();
return queryableStoreProvider.getStore(storeName, queryableStoreType);
}
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
if (!globalStore.isEmpty()) {
return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName);
}
final List<T> allStores = new ArrayList<>();
for (StateStoreProvider storeProvider : storeProviders) {
allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
}
if (allStores.isEmpty()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
return queryableStoreType.create(
new WrappingStoreProvider(storeProviders),
storeName);
}
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
final StateStore store = globalStateStores.get(storeName);
if (store == null || !queryableStoreType.accepts(store)) {
return Collections.emptyList();
}
if (!store.isOpen()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
}
return (List<T>) Collections.singletonList(store);
}
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
}
if (!streamThread.isRunningAndNotRebalancing()) {
throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
streamThread.state() + ", not RUNNING");
}
final List<T> stores = new ArrayList<>();
for (Task streamTask : streamThread.tasks().values()) {
final StateStore store = streamTask.getStore(storeName);
if (store != null && queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask +
" because the store is not open. The state store may have migrated to another instances.");
}
stores.add((T) store);
}
}
return stores;
}
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) {
final List<T> allStores = new ArrayList<>();
for (StateStoreProvider provider : storeProviders) {
final List<T> stores =
provider.stores(storeName, type);
allStores.addAll(stores);
}
if (allStores.isEmpty()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
return allStores;
}
public V get(final K key) {
Objects.requireNonNull(key);
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
for (ReadOnlyKeyValueStore<K, V> store : stores) {
try {
final V result = store.get(key);
if (result != null) {
return result;
}
} catch (InvalidStateStoreException e) {
throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
}
}
return null;
}
public KeyValueIterator<K, V> range(final K from, final K to) {
Objects.requireNonNull(from);
Objects.requireNonNull(to);
final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
@Override
public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
try {
return store.range(from, to);
} catch (InvalidStateStoreException e) {
throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
}
}
};
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction));
}
public KeyValueIterator<K, V> all() {
final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
@Override
public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
try {
return store.all();
} catch (InvalidStateStoreException e) {
throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
}
}
};
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction));
}
public long approximateNumEntries() {
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
long total = 0;
for (ReadOnlyKeyValueStore<K, V> store : stores) {
total += store.approximateNumEntries();
if (total < 0) {
return Long.MAX_VALUE;
}
}
return total;
}
void validateStoreOpen() {
if (!innerState.isOpen()) {
throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed.");
}
}
private void validateStoreOpen() {
if (!open) {
throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
}
}
public synchronized boolean hasNext() {
if (!open) {
throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName));
}
return iter.isValid();
}
public synchronized boolean hasNext() {
if (!open) {
throw new InvalidStateStoreException(String.format("Store %s has closed", storeName));
}
if (next != null) {
return true;
}
if (!underlying.hasNext()) {
return false;
}
next = underlying.next();
return true;
}
Text-to-speech function is limited to 200 characters