Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

 

Table of Contents

Status

Current stateUnder Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error.

Because of that, we should throw different exceptions for each type.

Proposed Changes

There are add four new exceptions:

...

Three categories exception throw to user

  • StateStoreRetriableExceptionStateStoreRetryableException: The application instance in the state of rebalancing, the user just need retry and wait until rebalance finished.
  • StateStoreMigratedException: The store got migrated and not hosted in application instance, the users need to rediscover the store.

  • StateStoreFailException: Fatal error when access state store, the user cannot retry or rediscover.

One Two internal exception: StateStoreClosedException., StateStoreStreamThreadNotRunningException

The following is the public method that users will call:

...

  • If state is RUNNING or REBALANCING:
    • StateStoreClosedException: should be wrapped to StateStoreRetriableExceptionStateStoreRetryableException
    • StateStoreMigratedException: should not be wrapped, directly thrown
  • if state is PENDING_SHUTDOWN or ERROR or NOT_RUNNING:
    • wrap InvalidStateStoreException(include subclasssub classes) to StateStoreFailException

Call Trace

 

Expand
titleCall trace 1: KafkaStreams#store()
  • KafkaStreams#store() (v)
    • QueryableStoreProvider#getStore() (v)
      • GlobalStateStoreProvider#stores() (v)
      • StreamThreadStateStroeProvider#stores() (v)

...

Expand
titleCall trace 11: ReadOnlyWindowStore#fetchAll()
  • CompositeReadOnlyWindowStore#fetchAll()
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
      • return
    • return new DelegatingPeekingKeyValueIterator()
  • DelegatingPeekingKeyValueIterator#hasNext() (v)
    • CompositeKeyValueIterator#hasNext()
      • NextIteratorFunciton#apply()
        • MeteredWindowStore#fetchAll()
          • CachingWindowStore#fetchAll()
            • AbstractStateStore#validateStoreOpen() (v)
            • ChangeLoggingWindowBytesStore#fetchAll()
              • RocksDBWindowBytesStore#fetchAll()
                • RocksDBSegmentedBytesStore#fetchAll()
                  • Segments#allSegments() (v)
                  • return new SegmentIterator()
                • return keyValueIterator
              • return
            • ThreadCache#all()
              • return new MemoryLRUCacheBytesIterator
            • return new MergedSortedCacheWindowStoreKeyValueIterator
          • return new MeteredWindowedKeyValueIterator()
        • return
      • MeteredWindowedKeyValueIterator#hasNext()
        • MergedSortedCacheWindowStoreKeyValueIterator#hasNext()
          AbstractMergedSortedCacheStoreIterator#hasNext()
          • WrappedKeyValueIterator#hasNext()
            • SegmentIterator#hasNext() (v)
              • return currentIterator
            • return
          • return
        • return
      • return
    • CompositeKeyValueIterator#next()
      • MeteredWindowedKeyValueIterator#next()
        • MergedSortedCacheWindowStoreKeyValueIterator#next()
          AbstractMergedSortedCacheStoreIterator#next()
          • WrappedKeyValueIterator#hasNext()
            • SegmentIterator#hasNext() (v)
            • return
          • AbstractMergedSortedCacheStoreIterator#nextStoreValue()
            • WrappedKeyValueIterator#next()
              • SegmentIterator#next()
                • RocksDbIterator#next()
                  • RocksDbIterator#hasNext() (v)
                  • RocksIterator.next()
                  • return entry
                • return
              • return keyvalue
            • return
          • return
        • return keyvalue
      • return
    • return
  • DelegatingPeekingKeyValueIterator#next()
    • DelegatingPeekingKeyValueIterator#hasNext()
    • return


Changes in call trace

 

Code Block
languagejava
public interface QueryableStoreType<T> {
    // TODO: pass stream instance parameter
    T create(final KafkaStreams streams, final StateStoreProvider storeProvider, final String storeName);
}

...

Code Block
languagejava
titleclass SegmentIterator
public boolean hasNext() {
    boolean hasNext = false;
    while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator)) || !currentSegment.isOpen())
            && segments.hasNext()) {
        close();
        currentSegment = segments.next();
        try {
            if (from == null || to == null) {
                currentIterator = currentSegment.all();
            } else {
                currentIterator = currentSegment.range(from, to);
            }
        } catch (InvalidStateStoreException e) {  // TODO: Replace with StateStoreClosedException
            // segment may have been closed so we ignore it.
        }
    }
    return currentIterator != null && hasNext;
}

 

 

 

Compatibility, Deprecation, and Migration Plan

  • All new exceptions extend from InvalidStateStoreException, this change will be fully backward compatible.

Rejected Alternatives

None.


G
M
T

 

         

 

Text-to-speech function is limited to 200 characters