Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: update for call trace 1, 2, 3

...

Expand
titleCall trace 3: CompositeReadOnlyKeyValueStore#range()
  • CompositeReadOnlyKeyValueStore#range() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • return new DelegatingPeekingKeyValueIterator<>()
  • DelegatingPeekingKeyValueIterator#hasNext()
    • CompositKeyValueIterator#hasNext()
      • NextIteratorFunction#apply()
        • MeteredKeyValueBytesStore#range()
          • InnerMeteredKeyValueStore#range()
            • CachingKeyValueStore#range()
              • AbstractStateStore#validateStoreOpen() (v)
              • ChangeLoggingKeyValueBytesStore#range()
                • RocksDBStore#range()
                  • RocksDBStore#validateStoreOpen() (v)
                  • return new RocksDBRangeIterator()
              • return new MergedSortedCacheKeyValueBytesStoreIterator()
            • return new MeteredKeyValueIterator()
          • return
        • return
    • CompositKeyValueIterator#next()
      • MeteredKeyValueIterator#next()
        • MergedSortedCacheKeyValueBytesStoreIterator#next()
          AbstractMergedSortedCacheStoreIterator#next()
          • RocksDBRangeIterator#hasNext()
            RocksDBIterator#hasNext() (v)
              • RocksIterator#isValid()
          • AbstractMergedStortedCacheStoreIterator#nextSrtoreValue()
            • RocksDBRangeIterator#next()
              RocksDBIterator#next()
              • RocksDBIterator#hasNext() (v)
              • RocksDBIterator#getKeyValue()
              • RocksIterator#next()
              • return keyvalue entry
            • return
          • return
        • return outerkeyvalue
      • return
    • return true
  • DelegatingPeekingKeyValueIterator#next()
    • return keyvalue

 

File changes

Code Block
languagejava
titleKafkaStreams#store()
collapsetrue
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    validateIsRunning();
    return queryableStoreProvider.getStore(storeName, queryableStoreType);
}
Code Block
languagejava
titleQueryableStoreProvider#getStore()
collapsetrue
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
    if (!globalStore.isEmpty()) {
        return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName);
    }
    final List<T> allStores = new ArrayList<>();
    for (StateStoreProvider storeProvider : storeProviders) {
        allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
    }
    if (allStores.isEmpty()) {
        throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    return queryableStoreType.create(
            new WrappingStoreProvider(storeProviders),
            storeName);
}
Code Block
languagejava
titleGlobalStateStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    final StateStore store = globalStateStores.get(storeName);
    if (store == null || !queryableStoreType.accepts(store)) {
        return Collections.emptyList();
    }
    if (!store.isOpen()) {
        throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
    }
    return (List<T>) Collections.singletonList(store);
}
Code Block
languagejava
titleStreamThreadStateStoreProvider#stores()
collapsetrue
 public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    if (streamThread.state() == StreamThread.State.DEAD) {
        return Collections.emptyList();
    }
    if (!streamThread.isRunningAndNotRebalancing()) {
        throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    final List<T> stores = new ArrayList<>();
    for (Task streamTask : streamThread.tasks().values()) {
        final StateStore store = streamTask.getStore(storeName);
        if (store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
                throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
            }
            stores.add((T) store);
        }
    }
    return stores;
}
Code Block
languagejava
titleWrappingStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) {
    final List<T> allStores = new ArrayList<>();
    for (StateStoreProvider provider : storeProviders) {
        final List<T> stores =
            provider.stores(storeName, type);
        allStores.addAll(stores);
    }
    if (allStores.isEmpty()) {
        throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    return allStores;
}
Code Block
languagejava
titleCompositeReadOnlyKeyValueStore#get()
collapsetrue
public V get(final K key) {
    Objects.requireNonNull(key);
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    for (ReadOnlyKeyValueStore<K, V> store : stores) {
        try {
            final V result = store.get(key);
            if (result != null) {
                return result;
            }
        } catch (InvalidStateStoreException e) {
            throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
        }

    }
    return null;
}
Code Block
languagejava
titleAbstractStateStore#validateStoreOpen()
collapsetrue
void validateStoreOpen() {
    if (!innerState.isOpen()) {
        throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed.");
    }
}
Code Block
languagejava
titleRocksDBStore#validateStoreOpen()
collapsetrue
private void validateStoreOpen() {
    if (!open) {
        throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
    }
}
Code Block
languagejava
titleCompositeReadOnlyKeyValueStore#range()
collapsetrue
public KeyValueIterator<K, V> range(final K from, final K to) {
    Objects.requireNonNull(from);
    Objects.requireNonNull(to);
    final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
        @Override
        public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
            try {
                return store.range(from, to);
            } catch (InvalidStateStoreException e) {
                throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
            }
        }
    };
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction));
}
Code Block
languagejava
titleRocksDBIterator#hasNext()
collapsetrue
public synchronized boolean hasNext() {
    if (!open) {
        throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName));
    }

    return iter.isValid();
}