THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error. Because of that, we should throw different exceptions for each type.
Proposed Changes
Three categories exception to user
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) { validateIsRunning(); try { return queryableStoreProvider.getStore(storeName, queryableStoreType); } catch (InvalidStateStoreException e) { if (state==State.RUNNING || state==State.REBALANCING) { if (e instanceof StateStoreClosedException) throw new StateStoreRetryableException(e); else // e instanceof StateStoreMigratedException throw e; } else { // state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING throw new StateStoreFailException(e); } } } |
Call Trace 2: CompositeReadOnlyKeyValueStore#get()
CompositeReadOnlyKeyValueStore#get()
==> WrappingStoreProvider#stores()
==> StreamThreadStateStoreProvider#stores()
==> MeteredKeyValueBytesStore#get()
==> InnerMeteredKeyValueStore#get()
==> CachingKeyValueStore#get()
==> WrappedStateStore.AbstractStateStore#validateStoreOpen()
==> RocksDBStore#isOpen()
==> CachingKeyValueStore#getInternal()
==> ChangeLoggingKeyValueBytesStore#get()
==> RocksDBStore#get()
==> RocksDBStore#validateStoreOpen()
==> RocksDBStore#getInternal()
...
()
...
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) { final List<T> allStores = new ArrayList<>(); for (StateStoreProvider provider : storeProviders) { final List<T> stores = provider.stores(storeName, type); allStores.addAll(stores); } if (allStores.isEmpty()) { // Before: throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); } return allStores; } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
private void validateStoreOpen() { if (!openinnerState.isOpen()) { // Before: throw new InvalidStateStoreException("Store " + thisinnerState.name() + " is currently closed."); throw new StateStoreClosedException("Store " + thisinnerState.name() + " is currently closed."); } } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
private void validateStoreOpen() { if (!innerState.isOpen()open) { // Before: throw new InvalidStateStoreException("Store " + innerStatethis.name() + " is currently closed."); throw new StateStoreClosedException("Store " + innerStatethis.name() + " is currently closed."); } } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public V get(final K key) { Objects.requireNonNull(key); final List<ReadOnlyKeyValueStore<K, V>> stores; try { try { stores = storeProvider.stores(storeName, storeType); } catch (StateStoreClosedException e) { if (streams.state()== KafkaStreams.State.RUNNING || streams.state()== KafkaStreams.State.REBALANCING) throw new StateStoreRetryableException(e); else throw e; } for (ReadOnlyKeyValueStore<K, V> store : stores) { try { final V result = store.get(key); if (result != null) { return result; } } catch (StateStoreClosedException e) { // Before: throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); throw new StateStoreMigratedException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); } } } catch (InvalidStateStoreException e) { if (streams.state()== KafkaStreams.State.PENDING_SHUTDOWN || streams.state()== KafkaStreams.State.ERROR || streams.state()==KafkaStreams.State.NOT_RUNNING) { throw new StateStoreFailException(e); } else throw e; } return null; } |
...