Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleclass StreamThreadStateStoreProvider
 public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    if (streamThread.state() == StreamThread.State.DEAD) {
        return Collections.emptyList();
    }
    if (!streamThread.isRunningAndNotRebalancing()) {
        // TODO: Replace with StateStoreMigratedExceptionStateStoreRetryableException
        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
                streamThread.state() + ", not RUNNING");
    }
    final List<T> stores = new ArrayList<>();
    for (Task streamTask : streamThread.tasks().values()) {
        final StateStore store = streamTask.getStore(storeName);
        if (store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
                // TODO: Replace with StateStoreClosedException
                throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask +
                        " because the store is not open. The state store may have migrated to another instances.");
            }
            stores.add((T) store);
        }
    }
    return stores;
}

...