Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: update trace call 4

 

 

 

Table of Contents

Status

Current stateUnder Discussion

...

Expand
titleCall trace 3: CompositeReadOnlyKeyValueStore#range()
  • CompositeReadOnlyKeyValueStore#range() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • return new DelegatingPeekingKeyValueIterator<>()
  • DelegatingPeekingKeyValueIterator#hasNext() (v)
    • CompositKeyValueIterator#hasNext()
      • NextIteratorFunction#apply() (v)
        • MeteredKeyValueBytesStore#range()
          • InnerMeteredKeyValueStore#range()
            • CachingKeyValueStore#range()
              • AbstractStateStore#validateStoreOpen() (v)
              • ChangeLoggingKeyValueBytesStore#range()
                • RocksDBStore#range()
                  • RocksDBStore#validateStoreOpen() (v)
                  • return new RocksDBRangeIterator()
              • return new MergedSortedCacheKeyValueBytesStoreIterator()
            • return new MeteredKeyValueIterator()
          • return
        • return
      • return
    • CompositKeyValueIterator#next()
      • MeteredKeyValueIterator#next()
        • MergedSortedCacheKeyValueBytesStoreIterator#next()
          AbstractMergedSortedCacheStoreIterator#next()
          • RocksDBRangeIterator#hasNext()
            RocksDBIterator#hasNext() (v)
              • RocksIterator#isValid()
          • AbstractMergedStortedCacheStoreIterator#nextSrtoreValue()
            • RocksDBRangeIterator#next()
              RocksDBIterator#next()
              • RocksDBIterator#hasNextRocksDbIterator#hasNext() (v)
              • RocksDBIterator#getKeyValueRocksDbIterator#getKeyValue()
              • RocksIterator#next()
              • return keyvalue entry
            • return
          • return
        • return outerkeyvalue
      • return
    • return
  • DelegatingPeekingKeyValueIterator#next()
    • return keyvalue
Expand
titleCall trace 4: CompositeReadOnlyKeyValueStore#all()
  • CompositeReadOnlyKeyValueStore#range() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • return new DelegatingPeekingKeyValueIterator<>()
  • DelegatingPeekingKeyValueIterator#hasNext()
    • CompositKeyValueIterator#hasNext()
      • NextIteratorFunction#apply() (v)
        • MeteredKeyValueBytesStore#all()
          • InnerMeteredKeyValueStore#all()
            CachingKeyValueStore#all()
            • AbstractStateStore#validateStoreOpen() (v)
              • ChangeLoggingKeyValueBytesStore#all()
                • RocksDBStore#all()
                  • RocksDBStore#validateStoreOpen() (v)
                  • return new RocksDBIterator()
                • return
              • return new MergedSortedCacheKeyValueBytesStoreIterator()
            • return new MeteredKeyValueIterator()
          • return
        • return
      • return
    • CompositKeyValueIterator#next()
      • MeteredKeyValueIterator#next()
        • MergedSortedCacheKeyValueBytesStoreIterator#next()
          AbstractMergedSortedCacheStoreIterator#next()
          • MemoryLRUCacheBytesIterator.hasNext()
          • DelegatingPeekingKeyValueIterator.hasNext() (v)
          • AbstractMergedSortedCacheStoreIterator#nextStoreValue()
            • DelegatingPeekingKeyValueIterator#next()
              • DelegatingPeekingKeyValueIterator#hasNext() (v)
            • return
          • return
        • return outerkeyvalue
      • return
    • return
  • true
  • DelegatingPeekingKeyValueIterator#next()
    • return keyvalue

 

 

 

File changes

Code Block
languagejava
titleKafkaStreams#store()
collapsetrue
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    validateIsRunning();
    return queryableStoreProvider.getStore(storeName, queryableStoreType);
}

...

Code Block
languagejava
titleAbstractStateStore#validateStoreOpenCompositeReadOnlyKeyValueStore#range()
collapsetrue
void validateStoreOpen(public KeyValueIterator<K, V> range(final K from, final K to) {
    if (!innerState.isOpen()Objects.requireNonNull(from);
    Objects.requireNonNull(to);
    final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
        throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed."@Override
        public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
            try {
                return store.range(from, to);
    }
}
Code Block
languagejava
titleRocksDBStore#validateStoreOpen()
collapsetrue
private void validateStoreOpen(        } catch (InvalidStateStoreException e) {
      if (!open) {
        throw new InvalidStateStoreException("Store " + this.name + " is currently closedState store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
            }
        }
    };
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction));
}
Code Block
languagejava
titleCompositeReadOnlyKeyValueStore#rangeCompositeReadOnlyKeyValueStore#all()
collapsetrue
public KeyValueIterator<K, V> range(final K from, final K toall() {
    Objects.requireNonNull(from);
    Objects.requireNonNull(to);
    final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
        @Override
        public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
            try {
                return store.rangeall(from, to);
            } catch (InvalidStateStoreException e) {
                throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
            }
        }
    };
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction));
}
Code Block
languagejava
titleAbstractStateStore#validateStoreOpen()
collapsetrue
void validateStoreOpen() {
    if (!innerState.isOpen()) {
        throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed.");
    }
}
Code Block
languagejava
titleRocksDBStore#validateStoreOpen()
collapsetrue
private void validateStoreOpen() {
    if (!open) {
        throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
    }
}
Code Block
languagejava
titleRocksDBIterator#hasNext()
collapsetrue
public synchronized boolean hasNext() {
    if (!open) {
        throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName));
    }

    return iter.isValid();
}
Code Block
languagejava
titleDelegatingPeekingKeyValueIterator#hasNext()
collapsetrue
public synchronized boolean hasNext() {
    if (!open) {
        throw new InvalidStateStoreException(String.format("Store %s has closed", storeName));
    }
    if (next != null) {
        return true;
    }

    if (!underlying.hasNext()) {
        return false;
    }

    next = underlying.next();
    return true;
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

...