Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There append three new exceptions:

Code Block
languagejava
// public class StateStoreClosedException extends InvalidStateStoreException
public class StateStoreMigratedException extends InvalidStateStoreException
public class StateStoreRetryableException extends InvalidStateStoreException
public class StateStoreFailException extends InvalidStateStoreException

...

  • KafkaStreams
    • stores()
  • ReadOnlyKeyValueStore(CompositeReadOnlyKeyValueStore)
    • get(k)
    • range(from, to)
    • all()
    • approximateNumEntries()
  • ReadOnlySessionStore(CompositeReadOnlySessionStore)
    • fetch(k)
    • fetch(from, to)
  • ReadOnlyWindowStore(CompositeReadOnlyWindowStore)
    • fetch(k, rf, tt)
    • fetch(from, to, rf, tt)
    • all()
    • fetchAll()
  • KeyValueIterator(DelegatingPeekingKeyValueIterator)
    • next()
    • hasNext()
    • peekNextKey()

We need During user call one of above methods, we should check KafkaStreams status state by the following rule when any state store exception thrown during user call all of above methods InvalidStateStoreException is thrown:

  • If state is RUNNING or REBALANCING
    • StateStoreClosedException: should be wrapped to StateStoreRetriableException
    • StateStoreMigratedException: should not be wrapped, directly thrownwrap InvalidStateStoreException to StateStoreRetriableException
  • if state is PENDING_SHUTDOW or ERROR or NOT_RUNNING
    • wrap InvalidStateStoreException to StateStoreFailException

...

Expand
titleCall trace 1: KafkaStreams#store()
  • KafkaStreams#store() (v)
    • QueryableStoreProvider#getStore() (v)
      • GlobalStateStore#storesGlobalStateStoreProvider#stores() (v)
      • StreamThreadStateStroeProvider#stores() (v)

...

Expand
titleCall trace 4: ReadOnlyKeyValueStore#all()
  • CompositeReadOnlyKeyValueStore#all() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • return new DelegatingPeekingKeyValueIterator<>()
  • DelegatingPeekingKeyValueIterator#hasNext() (v)
    • CompositKeyValueIterator#hasNext()
      • NextIteratorFunction#apply() (v)
        • MeteredKeyValueBytesStore#all()
          • InnerMeteredKeyValueStore#all()
            CachingKeyValueStore#all()
            • AbstractStateStore#validateStoreOpen() (v)
              • ChangeLoggingKeyValueBytesStore#all()
                • RocksDBStore#all()
                  • RocksDBStore#validateStoreOpen() (v)
                  • return new RocksDBIterator()
                • return
              • return new MergedSortedCacheKeyValueBytesStoreIterator()
            • return new MeteredKeyValueIterator()
          • return
        • return
      • return
    • CompositKeyValueIterator#next()
      • MeteredKeyValueIterator#next()
        • MergedSortedCacheKeyValueBytesStoreIterator#next()
          AbstractMergedSortedCacheStoreIterator#next()
          • MemoryLRUCacheBytesIterator.hasNext()
          • DelegatingPeekingKeyValueIterator.hasNext() (v)
          • AbstractMergedSortedCacheStoreIterator#nextStoreValue()
            • DelegatingPeekingKeyValueIterator#next()
              • DelegatingPeekingKeyValueIterator#hasNext() (v)
            • return
          • return
        • return outerkeyvalue
      • return
    • return
  • DelegatingPeekingKeyValueIterator#next()
    • return keyvalue

 

...

Expand
titleCall trace 7: ReadOnlySessionStore#fetch(from, to)
  • CompositeReadOnlySessionStore#fetch(from, to) (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • return new DelegatingPeekingKeyValueIterator<>()
  • DelegatingPeekingKeyValueIterator#hasNext() (v)
    • CompositKeyValueIterator#hasNext()
      • NextIteratorFunction#apply(store)
        • MeteredSessionStore#fetch(from, to)
          • MeteredSessionStore#findSession(from, to, 0, t)
            • CachingSessionStore#findSession(from, to, 0, t)
              • CachingSessionStore#validStoreOpen()
                AbstractStateStore#validStoreOpen() (v)
                • ChangeLoggingSessionBytesStore#isOpen()
                  • RocksDBSessionStore#isOpen()
                    AbstractStateStore#isOpen()
                    • RocksDBSegmentedBytesStore#isOpen()
                    • return
                  • return
                • return
              • ChangeLoggingSessionBytesStore#findSesisons()
                • RocksDBSessionStore#findSessions()
                  • RocksDBSegmentedBytesStore#fetch()
                    • SessionKeySchema#segmentsToSearch()
                      • Segments#segments() (v)
                        • RocksDBStore#isOpen()
                      • return new SegmentIterator()
                    • return new WrappedSessionStoreIterator()
                  • return
                • return
              • return new MergedSortedCacheSessionStoreIterator()
            • return MeteredWindowedKeyValueIterator()
          • return
        • return
      • MeteredWindowedKeyValueIterator#hasNext()
        • MergedSortedCacheSessionStoreIterator#hasNext()
          AbstractMergedSortedCacheStoreIterator#hasNext()
          • FilteredCacheIterator#hasNext()
          • WrappedSessionStoreIterator#hasNext()
            • SegmentIterator#hasNext()
              • Segment.range(from, to)
                RocksDBStore.range(from, to)
                • RocksDBStore.validateStoreOpen() (v)
                • return new RocksDBRangeIterator()
              • return
            • return
          • return
        • return
      • return
    • CompositeKeyValueIterator#next()
    • return
  • DelegatingPeekingKeyValueIterator#next()
    • return keyvalue

 

 

 

Changed Classes

Code Block
languagejava
public interface QueryableStoreType<T> {
    T create(final KafkaStreams streams, final StateStoreProvider storeProvider, final String storeName);
}
Code Block
languagejava
titleKafkaStreams#store()
collapsetrue
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    validateIsRunning();
    return queryableStoreProvider.getStore(storeName, queryableStoreType);
}

...