THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Call Trace 1: KafkaStreams#store()
Expand | ||
---|---|---|
| ||
|
...
|
...
|
Call Trace 2: CompositeReadOnlyKeyValueStore#get()
Expand | ||
---|---|---|
| ||
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T>T storesstore(final String storeName, final QueryableStoreType<T> queryableStoreType) { validateIsRunning(); try { final StateStore store =return globalStateStoresqueryableStoreProvider.getgetStore(storeName, queryableStoreType); } catch (InvalidStateStoreException e) { if (store state==State.RUNNING null || !queryableStoreType.accepts(store)state==State.REBALANCING) { return Collections.emptyList(); if (e instanceof StateStoreClosedException) } throw ifnew StateStoreRetryableException(!store.isOpen()) { e); else // Before: e instanceof StateStoreMigratedException // throw new InvalidStateStoreException("the state store, " + storeName + ", is not open."); e; } else { // throw new StateStoreClosedException("the state store, " + storeName + ", is not open."state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING throw new StateStoreFailException(e); } return (List<T>) Collections.singletonList(store);} } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T>T storesgetStore(final String storeName, final QueryableStoreType<T> queryableStoreType) { if (streamThread.state() == StreamThread.State.DEADfinal List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType); if (!globalStore.isEmpty()) { return queryableStoreType.create(streams, new WrappingStoreProvider(Collections.emptyList(<StateStoreProvider>singletonList(globalStoreProvider)), storeName); } if (!streamThread.isRunningAndNotRebalancing()) {final List<T> allStores = new ArrayList<>(); for (StateStoreProvider storeProvider : storeProviders) { allStores.addAll(storeProvider.stores(storeName, queryableStoreType)); } // Before: if (allStores.isEmpty()) { // throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); } final List<T> stores =return queryableStoreType.create(streams, new ArrayList<>WrappingStoreProvider(storeProviders);, for (Task streamTask : streamThread.tasks().values()) { storeName); } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { final StateStore store = streamTaskglobalStateStores.getStoreget(storeName); if (store !== null &&|| !queryableStoreType.accepts(store)) { return Collections.emptyList(); } if (!store.isOpen()) { // Before: // throw new InvalidStateStoreException("the state store, " + storeName + ", mayis have migrated to another instancenot open."); throw new StateStoreMigratedExceptionStateStoreClosedException("the state store, " + storeName + ", mayis have migrated to another instancenot open."); } return stores.add((T) store); } } return stores(List<T>) Collections.singletonList(store); } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> TList<T> storestores(final String storeName, final QueryableStoreType<T> queryableStoreType) { if validateIsRunning(streamThread.state(); == try StreamThread.State.DEAD) { return queryableStoreProviderCollections.getStoreemptyList(storeName, queryableStoreType); } catch if (InvalidStateStoreException e!streamThread.isRunningAndNotRebalancing()) { if (state==State.RUNNING || state==State.REBALANCING) { // Before: // throw new InvalidStateStoreException("the state ifstore, (e" instanceof+ StateStoreClosedException) storeName + ", may have migrated to another instance."); throw new StateStoreRetryableException(e); else // e instanceof StateStoreMigratedException throw e;StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); } final List<T> stores = new ArrayList<>(); for (Task streamTask : streamThread.tasks().values()) { }final elseStateStore { store = streamTask.getStore(storeName); if (store // state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING!= null && queryableStoreType.accepts(store)) { throw new StateStoreFailException(e);if (!store.isOpen()) { } } } |
Call Trace 2: CompositeReadOnlyKeyValueStore#get()
...
// Before: |
...
|
...
|
...
|
...
// |
...
|
...
throw new InvalidStateStoreException("the state store, |
...
" + storeName + ", may have migrated to another instance."); |
...
|
...
throw |
...
new |
...
StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); |
...
|
...
} |
...
stores.add((T) store); } } |
...
return stores;
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) { final List<T> allStores = new ArrayList<>(); for (StateStoreProvider provider : storeProviders) { final List<T> stores = provider.stores(storeName, type); allStores.addAll(stores); } if (allStores.isEmpty()) { // Before: throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); } return allStores; } |
...