Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public class StateStoreMigratedException extends InvalidStateStoreException
public class StateStoreRetryableException extends InvalidStateStoreException
public class StateStoreFailException extends InvalidStateStoreException

 

 

Call Trace 1: KafkaStreams#store()

KafkaStreams#store()
==> QueryableStoreProvider#getStore()
==> GlobalStateStore#stores()
==> StreamThreadStateStoreProvider#stores()
Code Block
languagejava
titleGlobalStateStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    final StateStore store = globalStateStores.get(storeName);
    if (store == null || !queryableStoreType.accepts(store)) {
        return Collections.emptyList();
    }
    if (!store.isOpen()) {
        // Before:
        //   throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
        throw new StateStoreClosedException("the state store, " + storeName + ", is not open.");
    }
    return (List<T>) Collections.singletonList(store);
}
Code Block
languagejava
titleStreamThreadStateStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    if (streamThread.state() == StreamThread.State.DEAD) {
        return Collections.emptyList();
    }
    if (!streamThread.isRunningAndNotRebalancing()) {
        // Before: 
        //   throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
        throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    final List<T> stores = new ArrayList<>();
    for (Task streamTask : streamThread.tasks().values()) {
        final StateStore store = streamTask.getStore(storeName);
        if (store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
                // Before:
                //   throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
                throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
            }
            stores.add((T) store);
        }
    }
    return stores;
}
Code Block
languagejava
titleKafkaStreams#store()
collapsetrue
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    validateIsRunning();
    try {
        return queryableStoreProvider.getStore(storeName, queryableStoreType);
    } catch (InvalidStateStoreException e) {
        if (state==State.RUNNING || state==State.REBALANCING) {
            if (e instanceof StateStoreClosedException)
                throw new StateStoreRetryableException(e);
            else // e instanceof StateStoreMigratedException
                throw e;
        } else {
            // state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING
            throw new StateStoreFailException(e);
        }
    }
}

 

 

 

 

Call Trace 2: CompositeReadOnlyKeyValueStore#get()

CompositeReadOnlyKeyValueStore#get()
==> WrappingStoreProvider#stores()
==> StreamThreadStateStoreProvider#stores()
==> MeteredKeyValueBytesStore#get()
==> InnerMeteredKeyValueStore#get()
==> CachingKeyValueStore#get()
==> WrappedStateStore.AbstractStateStore#validateStoreOpen()
==> RocksDBStore#isOpen()
==> CachingKeyValueStore#getInternal()
==> ChangeLoggingKeyValueBytesStore#get()
==> RocksDBStore#get()
==> RocksDBStore#validateStoreOpen()
==> RocksDBStore#getInternal()

 

 

 

Code Block
languagejava
titleCompositeReadOnlyKeyValueStore#get()
collapsetrue
 

 

 

 

Code Block
languagejava
titleWrappingStoreProvider#storesGlobalStoreProvider#stores()
collapsetrue
    public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreTypetype) {
        final StateStoreList<T> storeallStores = new globalStateStores.getArrayList<>(storeName);
        if (store == null || !queryableStoreType.accepts(store)) {
for (StateStoreProvider provider : storeProviders) {
            final List<T> stores =
                provider.stores(storeName, type);
           return CollectionsallStores.emptyListaddAll(stores);
        }
        if (!storeallStores.isOpenisEmpty()) {
        // Before:
        //  Before: throw new InvalidStateStoreException("the state store, " + storeName + ", is not open may have migrated to another instance.");
            throw new StateStoreClosedExceptionStateStoreMigratedException("the state store, " + storeName + ", is not open may have migrated to another instance.");
        }
        return (List<T>) Collections.singletonList(store)allStores;
    }
Code Block
languagejava
titleStreamThreadStateStoreProvider#storesRocksDBStore#validateStoreOpen()
collapsetrue
publicprivate <T>void List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreTypevalidateStoreOpen() {
    if (!open) {
    if (streamThread.state() == StreamThread.State.DEAD) {
    // Before: throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
        throw new StateStoreClosedException("Store " return+ Collections.emptyList(this.name + " is currently closed");
    }
}
Code Block
languagejava
titleWrappedStateStore.AbstractStateStore#validateStoreOpen()
collapsetrue
void validateStoreOpen() {
    if (!streamThreadinnerState.isRunningAndNotRebalancingisOpen()) {
        // Before: 
    throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed.");
     //   throw new InvalidStateStoreExceptionStateStoreClosedException("theStore state" store, "+ innerState.name() + " storeNameis +currently closed.");
    }
}
Code Block
languagejava
titleCompositeReadOnlyKeyValueStore#get()
collapsetrue
public V get(final K key) {
    Objects.requireNonNull(key);
    final List<ReadOnlyKeyValueStore<K, V>> stores;
    try {", may have migrated to another instance.");
        throwtry new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
{
            stores = storeProvider.stores(storeName, storeType);
        } catch (StateStoreClosedException e) {
    final List<T> stores = new ArrayList<>(        if (streams.state()== KafkaStreams.State.RUNNING || streams.state()== KafkaStreams.State.REBALANCING)
                throw new StateStoreRetryableException(e);
            else
                throw e;
        }

        for (Task streamTaskReadOnlyKeyValueStore<K, V> store : streamThread.tasks().values())stores) {
            try {
                final StateStoreV storeresult = streamTaskstore.getStoreget(storeNamekey);
                if (storeresult != null && queryableStoreType.accepts(store)) {
                   if (!store.isOpen()) {
  return result;
                }
            } catch (StateStoreClosedException // Before:e) {
                //  Before: throw new InvalidStateStoreException("the stateState store, "is +not storeNameavailable +anymore ",and may have been migrated to another instance; please re-discover its location from the state metadata.");
                throw new StateStoreMigratedException("the stateState store, "is +not storeNameavailable +anymore ",and may have been migrated to another instance; please re-discover its location from the state metadata.");
            }
        }
    }  stores.add((T) storecatch (InvalidStateStoreException e) {
        if (streams.state()== KafkaStreams.State.PENDING_SHUTDOWN
                || streams.state()== KafkaStreams.State.ERROR
                || streams.state()==KafkaStreams.State.NOT_RUNNING) {
            throw new StateStoreFailException(e);
        } else
            throw e;
    }

    return storesnull;
}

 

 

 

...