You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

 

 

Warning

We try to keep this doc up to date, however, as it describes internals that might change at any point in time, there is no guarantee that this doc reflects the latest state of the code base.

 

Lifecycle of a StreamThread

 

StreamThread Lifecycle

 

Lifecycle of a StreamTask and StandbyTask

 

StreamTask Lifecycle

 

Exception Handling

A Kafka Streams client need to handle multiple different types of exceptions. We try to summarize what kind of exceptions are there, and how Kafka Streams should handle those. In general, Kafka Streams should be resilient to exceptions and keep processing even if some internal exceptions occur.

Types of Exceptions:

There are different categories how exceptions can be categoriezed. 

First, we can distinguish between recoverable and fatal exceptions. Recoverable exception should be handled internally and never bubble out to the user. For fatal exceptions, Kafka Streams is doomed to fail and cannot start/continue to process data. 

Related to this are retriable exception. While retriable exception are recoverable in general, it might happen that the (configurable) retry counter is exceeded; for this case, we end up with an fatal exception.

The second category are "external" vs "internal" exception. By "external" we refer to any exception that could be returned by the brokers. "Internal" exceptions are those that are raised locally.

For "external" exceptions, we need to consider KafkaConsumerKafkaProducer, and StreamsKafkaClient (to be replaced by AdmintClient). For internal exceptions, we have for example (de)serialization, state store, and user code exceptions as well as any other exception Kafka Streams raises itself (e.g., configuration exceptions).

Last but not least, we distinguish between exception that should never occur. If those exception do really occur, they indicate a bug and thus all those exception are fatal. All regular Java exception are in this category.

Coding implications:

  • We should never try to handle any fatal exceptions but clean up and shutdown
    • We should catch all those exceptions for cleanup only and rethrow unmodified (they will eventually bubble out of the thread and trigger uncaught exception hander if one is registered)
    • We should only log those exception (with ERROR level) once at the thread level before they bubble out of the thread to avoid duplicate logging
  • We need to do fine grained exception handling, ie, catch exceptions individually instead of coarse grained and react accordingly
  • All methods should have complete JavaDocs about exception they might throw
  • All exception classes must have strictly defined semantics that are documented in their JavaDocs
  • For retriable exception, we should throw a special StreamsException if reties are exhausted

 

 KafkaConsumerKafkaProducerStreamsKafakClientAdminClientStreams API
fatal (should never occur)

local:

- IllegalArgumentExcetpion

- IllegalStateException

- WakeupException

- InterruptExcetpion

remote:

- UnknownServerException

local:

- IllegalArgumentExcetpion

- IllegalStateException

- WakeupException

- InterruptExcetpion

remote:

- UnknownServerException

- OffsetMetadataTooLarge

- SerializationException (we use <byte[],byte[]> as types)

 

local:

- IllegalArgumentExcetpion

- IllegalStateException

- WakeupException

- InterruptExcetpion

remote:

- UnknownServerException

- InvalidTopicException

local:

- IllegalArgumentExcetpion

- IllegalStateException

- WakeupException

- InterruptExcetpion

remote:

UnknownServerException

- InvalidTopicExcetpion

local:

fatal

local:

- ConfigException

remote:

- AuthorizationException (including all subclasses)

- AuthenticationException (inlcuding all subclasses)

- SecurityDisabledException

- InvalidTopicException

 

local:

- ConfigException

remote:

- AuthorizationException (including all subclasses)

- AuthenticationException (inlcuding all subclasses)

- SecurityDisabledExcetpion

- InvalidTopicException

- UnkownTopicOrPartitionsException (retyable? refresh metadata?)

- RecordBatchTooLargeException

- RecordTooLargeException

 

local:

- ConfigException

remote:

- AuthorizationException (including all subclasses)

- AuthenticationException (inlcuding all subclasses)

- SecurityDisabledExcetpion

 

local:

- ConfigException

remote:

- AuthorizationException (including all subclasses)

- AuthenticationException (inlcuding all subclasses)

- SecurityDisabledExcetpion

local:

- ConfigException

- SerializationException

retriable

local:

 

remote:

- InvalidOffsetException (OffsetOutOfRangeException, NoOffsetForPartitionsException)

- CommitFailedException

- TimeoutException

- QuotaViolationException?

local:


remote:

- CorruptedRecordException

- NotEnoughReplicasAfterAppendException

- OffsetOutOfRangeException (when can producer get this?)

- TimeoutException

- QuotaViolationException?

- BufferExhausedException (verify)

local:

 

remote:

local:

 

remote:

local:

recoverable

local:

 

remote:

 

local:

 

remote:

- ProducerFencedException

local:

 

remote:

local:

 

remote:

local:

- LockException

 

Having a look at all KafkaException there are some exception we need to double check if they could bubble out any client (or maybe we should not care, an treat all of them as fatal/remote exceptions).

-> DataException, SchemaBuilderExcetpion, SchemaProjectorException, RequestTargetException, NotAssignedException, IllegalWorkerStateException, ConnectRestException, BadRequestException, AlreadyExistsException (might be possible to occur, or only TopicExistsException), NotFoundException, ApiException, InvalidTimestampException, InvalidGroupException, InvalidReplicationFactorException (might be possible, but inidcate bug), o.a.k.common.erros.InvalidOffsetExcetpion and o.a.k.common.errors.OffsetOutOfRangeException (side note: do those need cleanup – seems to be duplicates?), ReplicaNotAvailalbeException, UnknowServerException, OperationNotAttempedException, PolicyViolationException, InvalidConfigurationException, InvalidFetchSizeException, InvalidReplicaAssignmentException, InconsistendGroupProtocolException, ReblanceInProgressException, LogDirNotFoundException, BrokerNotAvailableException, InvalidOffsetCommitSizeException, InvalidTxnTimeoutException, InvalidPartitionsException, TopicExistsException (cf. AlreadyExistException), InvalidTxnStateException,, UnsupportedForMessageFormatException, InvalidSessionTimeoutException, InvalidRequestException, IllegalGenerationException, InvalidRequiredAckException,  

-> RetryableException, CoordinatorNotAvailalbeException, RetryableCommitException, DuplicateSequenceNumberException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException, InvalidRecordException, DisconnectException, InvalidMetaDataException (NotLeaderForPartitionException, NoAvailableBrokersException, UnkonwTopicOrPartitionException, KafkaStoreException, LeaderNotAvailalbeException), GroupCoordinatorNotAvailableException

Should never happen:

 - UnsupportedVersionException (we do a startup check and should not start processing for this case in the first place)

Handled by client (consumer, producer, admin) internally and should never bubble out of a client: (verify)

 - ConnectionException, RebalanceNeededException, InvalidPidMappingException, ConcurrentTransactionException, NotLeaderException, TransactionalCoordinatorFencedException, ControllerMovedException, UnkownMemberIdException, OutOfOrderSequenceException, CoordinatorLoadInProgressException, GroupLoadInProgressException, NotControllerException, NotCoordinatorException, NotCoordinatorForGroupException, StaleMetadataException, NetworkException, 

 

ExceptionHandling

 

 

  • No labels