Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: call trace: KafkaStreams#store()

...

Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error. Because of that, we should throw different exceptions for each type.

 

Proposed Changes

Add two new exception classes for the IQ state store errors:

Three categories exception to user

  • StateStoreRetryableException: just wait, plain retry
  • StateStoreMigratedException: need rediscover
  • StateStoreFailException: fatal error, cannot retry or rediscover

 

Code Block
languagejava
public class StateStoreMigratedException extends InvalidStateStoreException
public class StateStoreRetryableException extends InvalidStateStoreException

...


public class 

...

StateStoreFailException extends InvalidStateStoreException

 

 

Call Trace: KafkaStreams#store()

StateStoreMigratedException can be used to indicate a state store is currently not available and it may have been migrated to another instance.

StateStoreClosedException can be used to indicate a state store is closed.(RocksDBStore, WrappedStateStore)

Compatibility, Deprecation, and Migration Plan

  • Because no classes will be removed, this change will be fully backward.

Rejected Alternatives

KafkaStreams#store()
==> QueryableStoreProvider#getStore()
==> GlobalStateStore#stores()
==> StreamThreadStateStoreProvider#stores()
Code Block
languagejava
titleKafkaStreams#store()
collapsetrue
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    validateIsRunning();
    try {
        return queryableStoreProvider.getStore(storeName, queryableStoreType);
    } catch (InvalidStateStoreException e) {
        if (state==State.RUNNING || state==State.REBALANCING) {
            if (e instanceof StateStoreClosedException)
                throw new StateStoreRetryableException(e);
            else // e instanceof StateStoreMigratedException
                throw e;
        } else {
            // state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING
            throw new StateStoreFailException(e);
        }
    }
}
Code Block
languagejava
titleGlobalStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    final StateStore store = globalStateStores.get(storeName);
    if (store == null || !queryableStoreType.accepts(store)) {
        return Collections.emptyList();
    }
    if (!store.isOpen()) {
        // Before:
        //   throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
        throw new StateStoreClosedException("the state store, " + storeName + ", is not open.");
    }
    return (List<T>) Collections.singletonList(store);
}
Code Block
languagejava
titleStreamThreadStateStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    if (streamThread.state() == StreamThread.State.DEAD) {
        return Collections.emptyList();
    }
    if (!streamThread.isRunningAndNotRebalancing()) {
        // Before: 
        //   throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
        throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    final List<T> stores = new ArrayList<>();
    for (Task streamTask : streamThread.tasks().values()) {
        final StateStore store = streamTask.getStore(storeName);
        if (store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
                // Before:
                //   throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
                throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
            }
            stores.add((T) store);
        }
    }
    return stores;
}

 

Call Trace: CompositeReadOnlyKeyValueStore#get()

 

 

 

 

 No