Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: update for call trace 5

 (Working in Progress)

 

 

Table of Contents

Status

...

Expand
titleCall trace 1: KafkaStreams#store()
  • KafkaStreams#store() (v)
    • QueryableStoreProvider#getStore() (v)
      • GlobalStateStore#stores() (v)
      • StreamThreadStateStroeProvider#stores() (v)
Expand
titleCall trace 2: CompositeReadOnlyKeyValueStore#getReadOnlyKeyValueStore#get()
  • CompositeReadOnlyKeyValueStore#get() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • MeteredKeyValueBytesStore#get()
      • InnerMeteredKeyValueStore#get()
        • CachingKeyValueStore#get()
          • AbstractStateStore#validateStoreOpen() (v)
            • RocksDBStore#isOpen()
        • CachingKeyValueStore#getInternal()
          • ChangeLoggingKeyValueBytesStore#get()
            • RocksDBStore#get()
              • RocksDBStore#validateStoreOpen() (v)
            • RocksDBStore#getInternal()
Expand
titleCall trace 3: CompositeReadOnlyKeyValueStore#rangeReadOnlyKeyValueStore#range()
  • CompositeReadOnlyKeyValueStore#range() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • return new DelegatingPeekingKeyValueIterator<>()
  • DelegatingPeekingKeyValueIterator#hasNext() (v)
    • CompositKeyValueIterator#hasNext()
      • NextIteratorFunction#apply() (v)
        • MeteredKeyValueBytesStore#range()
          • InnerMeteredKeyValueStore#range()
            • CachingKeyValueStore#range()
              • AbstractStateStore#validateStoreOpen() (v)
              • ChangeLoggingKeyValueBytesStore#range()
                • RocksDBStore#range()
                  • RocksDBStore#validateStoreOpen() (v)
                  • return new RocksDBRangeIterator()
              • return new MergedSortedCacheKeyValueBytesStoreIterator()
            • return new MeteredKeyValueIterator()
          • return
        • return
      • return
    • CompositKeyValueIterator#next()
      • MeteredKeyValueIterator#next()
        • MergedSortedCacheKeyValueBytesStoreIterator#next()
          AbstractMergedSortedCacheStoreIterator#next()
          • RocksDBRangeIterator#hasNext()
            RocksDBIterator#hasNext() (v)
              • RocksIterator#isValid()
          • AbstractMergedStortedCacheStoreIterator#nextSrtoreValue()
            • RocksDBRangeIterator#next()
              RocksDBIterator#next()
              • RocksDbIterator#hasNext() (v)
              • RocksDbIterator#getKeyValue()
              • RocksIterator#next()
              • return keyvalue entry
            • return
          • return
        • return outerkeyvalue
      • return
    • return
  • DelegatingPeekingKeyValueIterator#next()
    • return keyvalue
Expand
titleCall trace 4: CompositeReadOnlyKeyValueStore#allReadOnlyKeyValueStore#all()
  • CompositeReadOnlyKeyValueStore#rangeCompositeReadOnlyKeyValueStore#all() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • return new DelegatingPeekingKeyValueIterator<>()
  • DelegatingPeekingKeyValueIterator#hasNext()
    • CompositKeyValueIterator#hasNext()
      • NextIteratorFunction#apply() (v)
        • MeteredKeyValueBytesStore#all()
          • InnerMeteredKeyValueStore#all()
            CachingKeyValueStore#all()
            • AbstractStateStore#validateStoreOpen() (v)
              • ChangeLoggingKeyValueBytesStore#all()
                • RocksDBStore#all()
                  • RocksDBStore#validateStoreOpen() (v)
                  • return new RocksDBIterator()
                • return
              • return new MergedSortedCacheKeyValueBytesStoreIterator()
            • return new MeteredKeyValueIterator()
          • return
        • return
      • return
    • CompositKeyValueIterator#next()
      • MeteredKeyValueIterator#next()
        • MergedSortedCacheKeyValueBytesStoreIterator#next()
          AbstractMergedSortedCacheStoreIterator#next()
          • MemoryLRUCacheBytesIterator.hasNext()
          • DelegatingPeekingKeyValueIterator.hasNext() (v)
          • AbstractMergedSortedCacheStoreIterator#nextStoreValue()
            • DelegatingPeekingKeyValueIterator#next()
              • DelegatingPeekingKeyValueIterator#hasNext() (v)
            • return
          • return
        • return outerkeyvalue
      • return
    • return
  • DelegatingPeekingKeyValueIterator#next()
    • return keyvalue

 

...

Expand

 

File changes

titleCall trace 5: ReadOnlyKeyValueStore#approximateNumEntries()
  • CompositeReadOnlyKeyValueStore#approximateNumEntries() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • MeteredKeyValueBytesStore#approximateNumEntries()
      • InnerMeteredKeyValueStore#approximateNumEntries()
        • CachingKeyValueStore#approximateNumEntries()
          • AbstractStateStore#validateStoreOpen() (v)
            • RocksDBStore#isOpen()
          • RocksDBStore#approximateNumEntries()
            • RocksDBStore#validateStoreOpen() (v)
            • return value
          • return
        • return
      • return
    • return total

 

Changed Classes

Code Block
languagejava
titleKafkaStreams#store()
collapse
Code Block
languagejava
titleKafkaStreams#store()
collapsetrue
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    validateIsRunning();
    return queryableStoreProvider.getStore(storeName, queryableStoreType);
}

...

Code Block
languagejava
titleCompositeReadOnlyKeyValueStore#get()CompositeReadOnlyKeyValueStore
collapsetrue
public V get(final K key) {
    Objects.requireNonNull(key);
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    for (ReadOnlyKeyValueStore<K, V> store : stores) {
        try {
            final V result = store.get(key);
            if (result != null) {
                return result;
            }
        } catch (InvalidStateStoreException e) {
            throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
        }

    }
    return null;
}
Code Block
languagejava
titleCompositeReadOnlyKeyValueStore#range()
collapsetrue

 
public KeyValueIterator<K, V> range(final K from, final K to) {
    Objects.requireNonNull(from);
    Objects.requireNonNull(to);
    final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
        @Override
        public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
            try {
                return store.range(from, to);
            } catch (InvalidStateStoreException e) {
                throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
            }
        }
    };
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction));
}
Code Block
languagejava
titleCompositeReadOnlyKeyValueStore#all()
collapsetrue


public KeyValueIterator<K, V> all() {
    final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
        @Override
        public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
            try {
                return store.all();
            } catch (InvalidStateStoreException e) {
                throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
            }
        }
    };
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction));
}
 
public long approximateNumEntries() {
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    long total = 0;
    for (ReadOnlyKeyValueStore<K, V> store : stores) {
        total += store.approximateNumEntries();
        if (total < 0) {
            return Long.MAX_VALUE;
        }
    }
    return total;
}
Code Block
languagejava
titleAbstractStateStore#validateStoreOpen()
collapsetrue
void validateStoreOpen() {
    if (!innerState.isOpen()) {
        throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed.");
    }
}

...