Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


A Kafka Streams client need to handle multiple different types of exceptions. We try to summarize what kind of exceptions are there, and how Kafka Streams should handle those. In general, Kafka Streams should be resilient to exceptions and keep processing even if some internal exceptions occur.

Types of Exceptions:

There are different categories how exceptions can be categoriezed. 

First, we need to distingues can distinguish between retryable and non-retriable (ie, fatal) exceptions. Retryable exception should be handled internally and only bubble out if a (configuare) amount of retries was unsuccessful. For non-retryable/fatal exceptions, Kafka Streams is doomed to fail and cannot start/continue to process data.


the exception thrown during processing of this record. Null if no error occurred.
     *                  Possible thrown exceptions include:
     *                  Non-Retriable exceptions (fatal, the message will never be sent):
     *                  InvalidTopicException
     *                  OffsetMetadataTooLargeException
     *                  RecordBatchTooLargeException
     *                  RecordTooLargeException
     *                  UnknownServerException
     *                  Retriable exceptions (transient, may be covered by increasing #.retries):
     *                  CorruptRecordException
     *                  OffsetMetadataTooLargeException
     *                  NotEnoughReplicasAfterAppendException
     *                  NotEnoughReplicasException
     *                  OffsetOutOfRangeException
     *                  TimeoutException
     *                  UnknownTopicOrPartitionException

Not all exception that could potentially occure are exception we expect to ever happen. If an unexpected exception occurs, it indicate a bug in Streams API code base. Thus, we should fail-fast to get a proper bug report from the field.


  • expected:
    • ConfigException (fatal)
    • InvalidOffsetException (handled by StreamThread)
      1. OffsetOutOfRangeException
      2. NoOffsetForPartitionsException
    • CommitFailedException [non-EOS only] (handled by StreamThread: swallow and retry on next commit) -> not swallowed for RebalanceCallback (need to get fixed)
    • QuotaViolationException (fatal ?)
    • AuthorizationException (fatal)
    • SecurityDisabledException (fatal)
    • InvalidTopicException (fatal)
    • all RetryableException
  • should never happen (all fatal):
    • ConcurrentModificationException
    • WakeupException
    • InterruptedException
    • IllegalArgumentException
    • IllegalStateException
    • All ApiException that are not mentioned somewhere else


  • expected:
    • BufferExhausedException (fatal)
    • SerializationException (fatal)
    • ProducerFencedException
    • SecurityDisabledException (fatal)
    • all RetryableException
  • should never happen:
    • All ApiException that are not mentioned somewhere else


The second category are "external" vs "internal" exception. By "external" we refer to any exception that could be returned by the brokers. "Internal" exceptions are those that are raised locally.

For "external" exceptions, we need to consider KafkaConsumerKafkaProducer, and StreamsKafkaClient (to be replaced by AdmintClient). For internal exceptions, we have for example (de)serialization, state store, and user code exceptions as well as any other exception Kafka Streams raises itself (e.g., configuration exceptions).

Last but not least, we distinguish between exception that should never occur. If those exception do really occur, they indicate a bug and thus all those exception are fatal. All regular Java exception are in this category.

Coding implications:

  • We should never try to handle any fatal exceptions but clean up and shutdown.
    • We should catch all those exceptions (all Java exception, generic KafakException, afor cleanup only and rethrow unmodified (they will eventually bubble out of the thread and trigger uncaught exception hander if one is registered)



 KafkaConsumerKafkaProducerStreamsKafakClientAdminClientStreams API
fatal (should never occur)


- IllegalArgumentExcetpion

- IllegalStateException

- WakeupException

- InterruptExcetpion


- UnknownServerException


- IllegalArgumentExcetpion

- IllegalStateException

- WakeupException

- InterruptExcetpion


- UnknownServerException

- OffsetMetadataTooLarge

- SerializationException (we use <byte[],byte[]> as types)



- IllegalArgumentExcetpion

- IllegalStateException

- WakeupException

- InterruptExcetpion


- UnknownServerException

- InvalidTopicException


- IllegalArgumentExcetpion

- IllegalStateException

- WakeupException

- InterruptExcetpion



- InvalidTopicExcetpion




- ConfigException


- AuthorizationException (including all subclasses)

- AuthenticationException (inlcuding all subclasses)

- SecurityDisabledException

- InvalidTopicException



- ConfigException


- AuthorizationException (including all subclasses)

- AuthenticationException (inlcuding all subclasses)

- SecurityDisabledExcetpion

- InvalidTopicException

- UnkownTopicOrPartitionsException (retyable? refresh metadata?)

- RecordBatchTooLargeException

- RecordTooLargeException



- ConfigException


- AuthorizationException (including all subclasses)

- AuthenticationException (inlcuding all subclasses)

- SecurityDisabledExcetpion



- ConfigException


- AuthorizationException (including all subclasses)

- AuthenticationException (inlcuding all subclasses)

- SecurityDisabledExcetpion


- ConfigException

- SerializationException





- InvalidOffsetException (OffsetOutOfRangeException, NoOffsetForPartitionsException)

- CommitFailedException

- TimeoutException

- QuotaViolationException?



- CorruptedRecordException

- NotEnoughReplicasAfterAppendException

- OffsetOutOfRangeException (when can producer get this?)

- TimeoutException 

- QuotaViolationException?

- BufferExhausedException (verify)











Having a look at all KafkaException there are some exception we need to double check if they could bubble out any client (or maybe we should not care, an treat all of them as fatal/remote exceptions).

-> DataException, SchemaBuilderExcetpion, SchemaProjectorException, RequestTargetException, NotLeaderException, NotAssignedException, IllegalWorkerStateException, ConnectRestException, BadRequestException, AlreadyExistsException (might be possible to occur, or only TopicExistsException), NotFoundException, ApiException, InvalidTimestampException, InvalidGroupException, InvalidReplicationFactorException (might be possible, but inidcate bug), o.a.k.common.erros.InvalidOffsetExcetpion and o.a.k.common.errors.OffsetOutOfRangeException (side note: do those need cleanup – seems to be duplicates?), ReplicaNotAvailalbeException, UnknowServerException, OperationNotAttempedException, TransactionalCoordinatorFencedException, PolicyViolationException, ControllerMovedException, UnkownMemberIdException, InvalidConfigurationException, InvalidFetchSizeException, InvalidReplicaAssignmentException, OutOfOrderSequenceException, InconsistendGroupProtocolException, ReblanceInProgressException, LogDirNotFoundException, BrokerNotAvailableException, InvalidOffsetCommitSizeException, InvalidTxnTimeoutException, InvalidPartitionsException, TopicExistsException (cf. AlreadyExistException), InvalidTxnStateException, ProducerFencedException, UnsupportedForMessageFormatException, UnsupportedVersionException, InvalidSessionTimeoutException, InvalidRequestException, IllegalGenerationException, InvalidRequiredAckException,  

-> RetryableException, CoordinatorLoadInProgressException, GroupLoadInProgressException, CoordinatorNotAvailalbeException, NotControllerException, RetryableCommitException, DuplicateSequenceNumberException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException, InvalidRecordExcetpion, NotCoordinatorForGroupException, DisconnectException, InvalidMetaDataException (NotLeaderForPartitionException, NoAvailableBrokersException, NetworkException, UnkonwTopicOrPartitionException, KafkaStoreException, StaleMetadataException, LeaderNotAvailalbeException), NotCoordinatorException, GroupCoordinatorNotAvailableException

Handled by client internally: (verify)

 - ConnectionException, RebalanceNeededException, InvalidPidMappingException, ConcurrentTransactionException, 

Might bubble out:





What about

  • TopicExistException (consumer group leader "split brain" – might be self-healing)
