Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: rewrite proposed changes section

...

Code Block
languagejava
# Throw to user exception - retryable
public class StateStoreMigratedExceptionStreamThreadNotStartedException extends InvalidStateStoreException
public class StateStoreRetryableExceptionStreamThreadRebalancingException extends InvalidStateStoreException
public class StateStoreFailExceptionStateStoreMigratedException extends InvalidStateStoreException

 
# Internal exceptionThrow to user exception - not retryable
public class StreamThreadNotRunningException extends InvalidStateStoreException
public class StateStoreClosedExceptionStateStoreFailedException extends InvalidStateStoreException

# Internal exception
public class StateStoreEmptyExceptionStateStoreIsEmptyException extends InvalidStateStoreException
public class StreamThreadNotRunningExceptionStateStoreClosedException extends InvalidStateStoreException

Three categories exception throw to the user:

  • StateStoreRetryableException: The application instance in the state of rebalancing, the user just need retry and wait until rebalance finished.
  • StateStoreMigratedException: The store got migrated and not hosted in application instance, the users need to rediscover the store.

  • StateStoreFailException: Fatal error when access state store, the user cannot retry or rediscover.

Three internal exceptions: StateStoreClosedException, StateStoreEmptyException, StreamThreadNotRunningException

The purpose of the internal exceptions is to distinguish between the different kinds of InvalidStateStoreException. The internal exception will be wrapped as user's category exception finally. 

For example, StreamThreadStateStoreProvider#stores() will throw StreamThreadNotRunningException(internal exception). And then the internal exception will be wrapped as StateStoreRetryableException or StateStoreFailException during the KafkaStreams.store() and throw to the user.

 

The following is the public method that users will call:

  • StateStoreClosedException: There will be throw StateStoreClosedException when state store is not open. This is an internal exception and will be wrapped to StateStoreMigratedException or StateStoreFailedException later.
  • StateStoreIsEmptyException: will be thrown when state store cannot be found in all StateStoreProviders. This is an internal exception, and will be wrapped to StateStoreMigratedException or StateStoreFailedException later.
  • StreamThreadNotRunningException: will be thrown when stream thread is not running and stream state is PENDING_SHUTDOWN / 
    NOT_RUNNING / ERROR. The user cannot retry when this exception is thrown.
  • StreamThreadNotStartedException: will be thrown when stream thread is not running and stream state is CREATED, the user can retry until to RUNNING.
  • StreamThreadRebalancingException: will be thrown when stream thread is not running and stream state is REBALANCING, the user just retry and wait until rebalance finished(RUNNING).
  • StateStoreMigratedException: A wrapper for StateStoreClosedException / StateStoreIsEmptyException. Will be thrown when stream thread is RUNNING/REBALANCING. The user need to rediscover the state store.
  • StateStoreFailedException: A wrapper for StateStoreClosedException/StateStoreIsEmptyException. Will be thrown when stream thread is PENDING_SHUTDOWN / NOT_RUNNING / ERROR. The user cannot retry when this exception is thrown.


The interface QueryableStoreType will append a new method, this can be assign KafkaStreams instance to all ComposeReadOnlyXXXXStore instance. 

Code Block
languagejava
public interface QueryableStoreType<T> {
  void setStreams(final KafkaStreams kafkaStreams);
}


 

The following is the public method that users will call:

  • KafkaStreams
    • store()
  • ReadOnlyKeyValueStore(CompositeReadOnlyKeyValueStore)
    • get(k)
    • range(from, to)
    • all()
    • approximateNumEntries()
  • ReadOnlySessionStore(CompositeReadOnlySessionStore)
    • fetch(k)
    • fetch(from, to)
  • ReadOnlyWindowStore(CompositeReadOnlyWindowStore)
    • fetch(
  • KafkaStreams
    • store()
  • ReadOnlyKeyValueStore(CompositeReadOnlyKeyValueStore)
    • get(k)
    • range(from, to)
    • all()
    • approximateNumEntries()
  • ReadOnlySessionStore(CompositeReadOnlySessionStore)
    • fetch(k)
    • fetch(from, to)
  • ReadOnlyWindowStore(CompositeReadOnlyWindowStore)
    • fetch(k, rf, tt)
    • fetch(from, to, rf, tt)
    • all()
    • fetchAll()
  • KeyValueIterator(DelegatingPeekingKeyValueIterator)
    • next()
    • hasNext()
    • peekNextKey()
  • WindowStoreIterator(MeteredWindowStoreIterator)
    • next()
    • hasNext()
    • peekNextKey()
Code Block
languagejava
public class KafkaStreams {
    public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType);
}
 
public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueStore<K, V> {
    public V get(final K key);
    public KeyValueIterator<K, V> range(final K from, final K to);
    public KeyValueIterator<K, V> all();
    public long approximateNumEntries();
}
 
public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore<K, V> {
    public KeyValueIterator<Windowed<K>, V> fetch(final K key);
    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to);
}
 
public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K, V> {
    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo);
    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo);
    public KeyValueIterator<Windowed<K>, V> all();
    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo);
}
 
class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> {
    public synchronized boolean hasNext();
    public synchronized KeyValue<K, V> next();
    public KeyValue<K, V> peekNext();
}

class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> {
    public boolean hasNext();
    public KeyValue<Long, V> next();
    public Long peekNextKey()
}

During user call one of above methods, we should check KafkaStreams state by the following rule when InvalidStateStoreException is thrown:

...

  • StateStoreClosedException: should be wrapped to StateStoreRetryableException
  • StreamThreadNotRunningException: should be wrapped to StateStoreRetryableException
  • StateStoreEmptyException: should be wrapped to StateStoreMigratedException

...


Call Trace

 

Expand
titleCall trace 1: KafkaStreams#store()
  • KafkaStreams#store() (v)
    • QueryableStoreProvider#getStore() (v)
      • GlobalStateStoreProvider#stores() (v)
      • StreamThreadStateStroeProvider#stores() (v)

...

Expand
titleCall trace 7: ReadOnlySessionStore#fetch(from, to)
  • CompositeReadOnlySessionStore#fetch(from, to) (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • return new DelegatingPeekingKeyValueIterator<>()
  • DelegatingPeekingKeyValueIterator#hasNext() (v)
    • CompositKeyValueIterator#hasNext()
      • NextIteratorFunction#apply(store)
        • MeteredSessionStore#fetch(from, to)
          • MeteredSessionStore#findSession(from, to, 0, t)
            • CachingSessionStore#findSession(from, to, 0, t)
              • CachingSessionStore#validStoreOpen()
                AbstractStateStore#validStoreOpen() (v)
                • ChangeLoggingSessionBytesStore#isOpen()
                  • RocksDBSessionStore#isOpen()
                    AbstractStateStore#isOpen()
                    • RocksDBSegmentedBytesStore#isOpen()
                    • return
                  • return
                • return
              • ChangeLoggingSessionBytesStore#findSesisons()
                • RocksDBSessionStore#findSessions()
                  • RocksDBSegmentedBytesStore#fetch()
                    • SessionKeySchema#segmentsToSearch()
                      • Segments#segments() (v)
                        • RocksDBStore#isOpen()
                      • return new SegmentIterator()
                    • return new WrappedSessionStoreIterator()
                  • return
                • return
              • return new MergedSortedCacheSessionStoreIterator()
            • return MeteredWindowedKeyValueIterator()
          • return
        • return
      • MeteredWindowedKeyValueIterator#hasNext()
        • MergedSortedCacheSessionStoreIterator#hasNext()
          AbstractMergedSortedCacheStoreIterator#hasNext()
          • FilteredCacheIterator#hasNext()
          • WrappedSessionStoreIterator#hasNext()
            • SegmentIterator#hasNext()
              • Segment.range(from, to)
                RocksDBStore.range(from, to)
                • RocksDBStore.validateStoreOpen() (v)
                • return new RocksDBRangeIterator()
              • return
            • return
          • return
        • return
      • return
    • CompositeKeyValueIterator#next()
    • return
  • DelegatingPeekingKeyValueIterator#next()
    • return keyvalue

...

Rejected Alternatives

None.


G
M
T

 

         

 

Text-to-speech function is limited to 200 characters