Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: update call trace 1,2,3

...

Code Block
languagejava
public class StateStoreClosedException extends InvalidStateStoreException
public class StateStoreMigratedException extends InvalidStateStoreException
public class StateStoreRetryableException extends InvalidStateStoreException
public class StateStoreFailException extends InvalidStateStoreException

 

Call Trace

Expand

...

titleCall

...

trace 1: KafkaStreams#store()

...

Expand
title
Expand...
  • KafkaStreams#store() (v)
    • QueryableStoreProvider#getStore() (v)
      • GlobalStateStore#stores() (v)
      • StreamThreadStateStroeProvider#stores() (v)
Expand
titleCall

...

trace 2: CompositeReadOnlyKeyValueStore#get()
Expand
titleExpand...
  • CompositeReadOnlyKeyValueStore#get(CompositeReadOnlyKeyValueStore#get() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • MeteredKeyValueBytesStore#get()
      • InnerMeteredKeyValueStore#get()
        • CachingKeyValueStore#get()
          • WrappedStateStore.AbstractStateStore#validateStoreOpen() (v)
            • RocksDBStore#isOpen()
        • CachingKeyValueStore#getInternal()
          • ChangeLoggingKeyValueBytesStore#get()
            • RocksDBStore#get()
              • RocksDBStore#validateStoreOpen() (v)
            • RocksDBStore#getInternal()

 

 

 

Code Block
languagejava
titleKafkaStreams#store()
collapsetrue
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    validateIsRunning();
    try {
        return queryableStoreProvider.getStore(storeName, queryableStoreType);
    } catch (InvalidStateStoreException e) {
        if (state==State.RUNNING || state==State.REBALANCING) {
            if (e instanceof StateStoreClosedException)
                throw new StateStoreRetryableException(e);
            else // e instanceof StateStoreMigratedException
                throw e;
        } else {
            // state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING
            throw new StateStoreFailException(e);
        }
    }
}
Code Block
languagejava
titleQueryableStoreProvider#getStore()
collapsetrue
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
    if (!globalStore.isEmpty()) {
        return queryableStoreType.create(streams,
                new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)),
                storeName);
    }
    final List<T> allStores = new ArrayList<>();
    for (StateStoreProvider storeProvider : storeProviders) {
        allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
    }
    if (allStores.isEmpty()) {
        // throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
        throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    return queryableStoreType.create(streams,
            new WrappingStoreProvider(storeProviders),
            storeName);
}
Code Block
languagejava
titleGlobalStateStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    final StateStore store = globalStateStores.get(storeName);
    if (store == null || !queryableStoreType.accepts(store)) {
        return Collections.emptyList();
    }
    if (!store.isOpen()) {
        // Before:
        //   throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
        throw new StateStoreClosedException("the state store, " + storeName + ", is not open.");
    }
    return (List<T>) Collections.singletonList(store);
}
Code Block
languagejava
titleStreamThreadStateStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    if (streamThread.state() == StreamThread.State.DEAD) {
        return Collections.emptyList();
    }
    if (!streamThread.isRunningAndNotRebalancing()) {
        // Before: 
        //   throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
        throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    final List<T> stores = new ArrayList<>();
    for (Task streamTask : streamThread.tasks().values()) {
        final StateStore store = streamTask.getStore(storeName);
        if (store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
                // Before:
                //   throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
                throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
            }
            stores.add((T) store);
        }
    }
    return stores;
}
Code Block
languagejava
titleWrappingStoreProvider#stores()
collapsetrue
    public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) {
        final List<T> allStores = new ArrayList<>();
        for (StateStoreProvider provider : storeProviders) {
            final List<T> stores =
                provider.stores(storeName, type);
            allStores.addAll(stores);
        }
        if (allStores.isEmpty()) {
            // Before: throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
            throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
        }
        return allStores;
    }
Code Block
languagejava
titleWrappedStateStore.AbstractStateStore#validateStoreOpen()
collapsetrue
void validateStoreOpen() {
    if (!innerState.isOpen()) {
        // Before: throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed.");
        throw new StateStoreClosedException("Store " + innerState.name() + " is currently closed.");
    }
}
Code Block
languagejava
titleRocksDBStore#validateStoreOpen()
collapsetrue
private void validateStoreOpen() {
    if (!open) {
        // Before: throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
        throw new StateStoreClosedException("Store " + this.name + " is currently closed");
    }
}
Code Block
languagejava
titleCompositeReadOnlyKeyValueStore#get()
collapsetrue
public V get(final K key) {
    Objects.requireNonNull(key);
    final List<ReadOnlyKeyValueStore<K, V>> stores;
    try {
        try {
            stores = storeProvider.stores(storeName, storeType);
        } catch (StateStoreClosedException e) {
            if (streams.state()== KafkaStreams.State.RUNNING || streams.state()== KafkaStreams.State.REBALANCING)
                throw new StateStoreRetryableException(e);
            else
                throw e;
        }

        for (ReadOnlyKeyValueStore<K, V> store : stores) {
            try {
                final V result = store.get(key);
                if (result != null) {
                    return result;
                }
            } catch (StateStoreClosedException e) {
                // Before: throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
                throw new StateStoreMigratedException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
            }
        }
    } catch (InvalidStateStoreException e) {
        if (streams.state()== KafkaStreams.State.PENDING_SHUTDOWN
                || streams.state()== KafkaStreams.State.ERROR
                || streams.state()==KafkaStreams.State.NOT_RUNNING) {
            throw new StateStoreFailException(e);
        } else
            throw e;
    }

    return null;
}

 

 

 

 

 

Expand
titleCall trace 3: CompositeReadOnlyKeyValueStore#range()
  • CompositeReadOnlyKeyValueStore#range() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • return new DelegatingPeekingKeyValueIterator<>()
  • DelegatingPeekingKeyValueIterator#hasNext()
    • CompositKeyValueIterator#hasNext()
      • NextIteratorFunction#apply()
        • MeteredKeyValueBytesStore#range()
          • InnerMeteredKeyValueStore#range()
            • CachingKeyValueStore#range()
              • AbstractStateStore#validateStoreOpen() (v)
              • ChangeLoggingKeyValueBytesStore#range()
                • RocksDBStore#range()
                  • RocksDBStore#validateStoreOpen() (v)
                  • return new RocksDBRangeIterator()
              • return new MergedSortedCacheKeyValueBytesStoreIterator()
            • return new MeteredKeyValueIterator()
          • return
        • return
    • CompositKeyValueIterator#next()
      • MeteredKeyValueIterator#next()
        • MergedSortedCacheKeyValueBytesStoreIterator#next()
          AbstractMergedSortedCacheStoreIterator#next()
          • RocksDBRangeIterator#hasNext()
            RocksDBIterator#hasNext() (v)
              • RocksIterator#isValid()
          • AbstractMergedStortedCacheStoreIterator#nextSrtoreValue()
            • RocksDBRangeIterator#next()
              RocksDBIterator#next()
              • RocksDBIterator#hasNext() (v)
              • RocksDBIterator#getKeyValue()
              • RocksIterator#next()
              • return keyvalue entry
            • return
          • return
        • return outerkeyvalue
      • return
    • return true
  • DelegatingPeekingKeyValueIterator#next()
    • return keyvalue

...