THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
public class StateStoreMigratedException extends InvalidStateStoreException public class StateStoreRetryableException extends InvalidStateStoreException public class StateStoreFailException extends InvalidStateStoreException |
Call Trace 1: KafkaStreams#store()
KafkaStreams#store()
==> QueryableStoreProvider#getStore()
==> GlobalStateStore#stores()
==> StreamThreadStateStoreProvider#stores()
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
final StateStore store = globalStateStores.get(storeName);
if (store == null || !queryableStoreType.accepts(store)) {
return Collections.emptyList();
}
if (!store.isOpen()) {
// Before:
// throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
throw new StateStoreClosedException("the state store, " + storeName + ", is not open.");
}
return (List<T>) Collections.singletonList(store);
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } if (!streamThread.isRunningAndNotRebalancing()) { // Before: // throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); } final List<T> stores = new ArrayList<>(); for (Task streamTask : streamThread.tasks().values()) { final StateStore store = streamTask.getStore(storeName); if (store != null && queryableStoreType.accepts(store)) { if (!store.isOpen()) { // Before: // throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); } stores.add((T) store); } } return stores; } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) { validateIsRunning(); try { return queryableStoreProvider.getStore(storeName, queryableStoreType); } catch (InvalidStateStoreException e) { if (state==State.RUNNING || state==State.REBALANCING) { if (e instanceof StateStoreClosedException) throw new StateStoreRetryableException(e); else // e instanceof StateStoreMigratedException throw e; } else { // state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING throw new StateStoreFailException(e); } } } |
Call Trace 2: CompositeReadOnlyKeyValueStore#get()
CompositeReadOnlyKeyValueStore#get()
==> WrappingStoreProvider#stores()
==> StreamThreadStateStoreProvider#stores()
==> MeteredKeyValueBytesStore#get()
==> InnerMeteredKeyValueStore#get()
==> CachingKeyValueStore#get()
==> WrappedStateStore.AbstractStateStore#validateStoreOpen()
==> RocksDBStore#isOpen()
==> CachingKeyValueStore#getInternal()
==> ChangeLoggingKeyValueBytesStore#get()
==> RocksDBStore#get()
==> RocksDBStore#validateStoreOpen()
==> RocksDBStore#getInternal()
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreTypetype) { final StateStoreList<T> storeallStores = new globalStateStores.getArrayList<>(storeName); if (store == null || !queryableStoreType.accepts(store)) { for (StateStoreProvider provider : storeProviders) { final List<T> stores = provider.stores(storeName, type); return CollectionsallStores.emptyListaddAll(stores); } if (!storeallStores.isOpenisEmpty()) { // Before: // Before: throw new InvalidStateStoreException("the state store, " + storeName + ", is not open may have migrated to another instance."); throw new StateStoreClosedExceptionStateStoreMigratedException("the state store, " + storeName + ", is not open may have migrated to another instance."); } return (List<T>) Collections.singletonList(store)allStores; } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
publicprivate <T>void List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreTypevalidateStoreOpen() { if (!open) { if (streamThread.state() == StreamThread.State.DEAD) { // Before: throw new InvalidStateStoreException("Store " + this.name + " is currently closed"); throw new StateStoreClosedException("Store " return+ Collections.emptyList(this.name + " is currently closed"); } } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
void validateStoreOpen() { if (!streamThreadinnerState.isRunningAndNotRebalancingisOpen()) { // Before: throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed."); // throw new InvalidStateStoreExceptionStateStoreClosedException("theStore state" store, "+ innerState.name() + " storeNameis +currently closed."); } } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public V get(final K key) { Objects.requireNonNull(key); final List<ReadOnlyKeyValueStore<K, V>> stores; try {", may have migrated to another instance."); throwtry new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); { stores = storeProvider.stores(storeName, storeType); } catch (StateStoreClosedException e) { final List<T> stores = new ArrayList<>( if (streams.state()== KafkaStreams.State.RUNNING || streams.state()== KafkaStreams.State.REBALANCING) throw new StateStoreRetryableException(e); else throw e; } for (Task streamTaskReadOnlyKeyValueStore<K, V> store : streamThread.tasks().values())stores) { try { final StateStoreV storeresult = streamTaskstore.getStoreget(storeNamekey); if (storeresult != null && queryableStoreType.accepts(store)) { if (!store.isOpen()) { return result; } } catch (StateStoreClosedException // Before:e) { // Before: throw new InvalidStateStoreException("the stateState store, "is +not storeNameavailable +anymore ",and may have been migrated to another instance; please re-discover its location from the state metadata."); throw new StateStoreMigratedException("the stateState store, "is +not storeNameavailable +anymore ",and may have been migrated to another instance; please re-discover its location from the state metadata."); } } } stores.add((T) storecatch (InvalidStateStoreException e) { if (streams.state()== KafkaStreams.State.PENDING_SHUTDOWN || streams.state()== KafkaStreams.State.ERROR || streams.state()==KafkaStreams.State.NOT_RUNNING) { throw new StateStoreFailException(e); } else throw e; } return storesnull; } |
...