THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } if (!streamThread.isRunningAndNotRebalancing()) { // TODO: Replace with StateStoreMigratedExceptionStateStoreRetryableException throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + streamThread.state() + ", not RUNNING"); } final List<T> stores = new ArrayList<>(); for (Task streamTask : streamThread.tasks().values()) { final StateStore store = streamTask.getStore(storeName); if (store != null && queryableStoreType.accepts(store)) { if (!store.isOpen()) { // TODO: Replace with StateStoreClosedException throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask + " because the store is not open. The state store may have migrated to another instances."); } stores.add((T) store); } } return stores; } |
...