Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error.

...

The main change is to introduce new exceptions that extend from InvalidStateStoreException. InvalidStateStoreException is not thrown at all anymore, but only new sub-classes.

 

Code Block
languagejava
# Throw to user exception
public class StateStoreMigratedException extends InvalidStateStoreException
public class StateStoreRetryableException extends InvalidStateStoreException
public class StateStoreFailException extends InvalidStateStoreException

 
# Internal exception
public class StateStoreClosedException extends InvalidStateStoreException
public class StateStoreEmptyException extends InvalidStateStoreException
public class StreamThreadNotRunningException extends InvalidStateStoreException

...

Three categories exception throw to the user:

...

During user call one of above methods, we should check KafkaStreams state by the following rule when InvalidStateStoreException is thrown:

  • If state is RUNNING or REBALANCING:
    • StateStoreClosedException: should be wrapped to StateStoreRetryableException
    • StreamThreadNotRunningException: should be wrapped to StateStoreRetryableException
    • StateStoreEmptyException: should be wrapped to StateStoreMigratedException
  • if state is PENDING_SHUTDOWN or ERROR or NOT_RUNNING:
    • wrap InvalidStateStoreException(include sub classes) to StateStoreFailException

Call Trace

 

Expand
titleCall trace 1: KafkaStreams#store()
  • KafkaStreams#store() (v)
    • QueryableStoreProvider#getStore() (v)
      • GlobalStateStoreProvider#stores() (v)
      • StreamThreadStateStroeProvider#stores() (v)

...