Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error. Because of that, we should throw different exceptions for each type.

 

Proposed Changes

Three categories exception to user

...

Code Block
languagejava
titleKafkaStreams#store()
collapsetrue
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    validateIsRunning();
    try {
        return queryableStoreProvider.getStore(storeName, queryableStoreType);
    } catch (InvalidStateStoreException e) {
        if (state==State.RUNNING || state==State.REBALANCING) {
            if (e instanceof StateStoreClosedException)
                throw new StateStoreRetryableException(e);
            else // e instanceof StateStoreMigratedException
                throw e;
        } else {
            // state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING
            throw new StateStoreFailException(e);
        }
    }
}

 

 

 

 

Call Trace 2: CompositeReadOnlyKeyValueStore#get()

CompositeReadOnlyKeyValueStore#get()
==> WrappingStoreProvider#stores()
==> StreamThreadStateStoreProvider#stores()
==> MeteredKeyValueBytesStore#get()
==> InnerMeteredKeyValueStore#get()
==> CachingKeyValueStore#get()
==> WrappedStateStore.AbstractStateStore#validateStoreOpen()
==> RocksDBStore#isOpen()
==> CachingKeyValueStore#getInternal()
==> ChangeLoggingKeyValueBytesStore#get()
==> RocksDBStore#get()
==> RocksDBStore#validateStoreOpen()
==> RocksDBStore#getInternal()

 

 

 

...

()

...

 

 

 

...

Code Block
languagejava
titleWrappingStoreProvider#stores()
collapsetrue
    public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) {
        final List<T> allStores = new ArrayList<>();
        for (StateStoreProvider provider : storeProviders) {
            final List<T> stores =
                provider.stores(storeName, type);
            allStores.addAll(stores);
        }
        if (allStores.isEmpty()) {
            // Before: throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
            throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
        }
        return allStores;
    }
Code Block
languagejava
titleRocksDBStore#validateStoreOpenWrappedStateStore.AbstractStateStore#validateStoreOpen()
collapsetrue
private void validateStoreOpen() {
    if (!openinnerState.isOpen()) {
        // Before: throw new InvalidStateStoreException("Store " + thisinnerState.name() + " is currently closed.");
        throw new StateStoreClosedException("Store " + thisinnerState.name() + " is currently closed.");
    }
}
Code Block
languagejava
titleWrappedStateStore.AbstractStateStore#validateStoreOpenRocksDBStore#validateStoreOpen()
collapsetrue
private void validateStoreOpen() {
    if (!innerState.isOpen()open) {
        // Before: throw new InvalidStateStoreException("Store " + innerStatethis.name() + " is currently closed.");
        throw new StateStoreClosedException("Store " + innerStatethis.name() + " is currently closed.");
    }
}
Code Block
languagejava
titleCompositeReadOnlyKeyValueStore#get()
collapsetrue
public V get(final K key) {
    Objects.requireNonNull(key);
    final List<ReadOnlyKeyValueStore<K, V>> stores;
    try {
        try {
            stores = storeProvider.stores(storeName, storeType);
        } catch (StateStoreClosedException e) {
            if (streams.state()== KafkaStreams.State.RUNNING || streams.state()== KafkaStreams.State.REBALANCING)
                throw new StateStoreRetryableException(e);
            else
                throw e;
        }

        for (ReadOnlyKeyValueStore<K, V> store : stores) {
            try {
                final V result = store.get(key);
                if (result != null) {
                    return result;
                }
            } catch (StateStoreClosedException e) {
                // Before: throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
                throw new StateStoreMigratedException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
            }
        }
    } catch (InvalidStateStoreException e) {
        if (streams.state()== KafkaStreams.State.PENDING_SHUTDOWN
                || streams.state()== KafkaStreams.State.ERROR
                || streams.state()==KafkaStreams.State.NOT_RUNNING) {
            throw new StateStoreFailException(e);
        } else
            throw e;
    }

    return null;
}

...