...
Now here's a full summary of all possible exceptions below:
Exception | Thrown Scenarios | Thrown places | Error Type | Suggestion |
---|---|---|---|---|
IllegalStateException | Checked for various call paths checking impossible situations, indicating a bug. | 1), Wrapped as KafkaException(e) | Fatal | We should always throw IllegalStateException directly without wrapping |
AuthenticationException | Only for txnal requests whose txn.id are not authenticated | 1), Wrapped as KE(e) | Fatal | None, all good |
InvalidPidMappingException | Only for txnal request with the encoded txnID are not recognized or its corresponding PID is incorrect | 1), Wrapped as KE(e) | After KIP-360 (2.5+), Abortable as we bump epoch; otherwise Fatal | None |
UnknownProducerIdException | Similar to InvalidPidMappingException but only for produce request (i.e. the PID is not recognized on the partition leader). NOTE this is removed as part of KIP-360, and hence would only be returned by old brokers. We keep this error code for now since we may re-use it in the future. | 1), Wrapped as KE(e) | After KIP-360 (2.5+), Abortable as we bump epoch; otherwise Fatal | None |
TransactionAbortedException | Only for produce request batches, when the txn is already aborting we would simply abort all unsent batches with this error | 2) | N/A since it is not an error case | None |
ClusterAuthorizationException TransactionalIdAuthorizationException UnsupportedVersionException UnsupportedForMessageFormatException | Most of these errors are returned from produce responses (txnal response could also return UnsupportedVersionException). When these errors return, we would immediately mark the txnManager to error state as well. These are examples where the exceptions could be thrown in both txnManager#maybeFailWithError as well as from send callback/future. | 1), Wrapped as KE(e); and 2) | Fatal | See below |
InvalidRecordException InvalidRequiredAcksException NotEnoughReplicasAfterAppendException NotEnoughReplicasException RecordBatchTooLargeException InvalidTopicException CorruptRecordException UnknownTopicOrPartitionException NotLeaderOrFollowerException TimeoutException | These are all errors returned from produce responses, that are non-fatal (timeout exception on expired batch). | 1), Wrapped as KE(e); and 2) | Abortable | See below |
TopicAuthorizationException GroupAuthorizationException | TopicAuthorizationException could be thrown via addPartition. GroupAuthorizationException could be thrown via sendOffsetToTxn / findCoordinator. Today they are all categorized as abortable but I think this should be fatal. | Abortable | Should be fatal. | |
FencedInstanceIdException CommitFailedException | Thrown from TxnOffsetCommit (CommitFailedException are translated from UNKNOWN_MEMBER and ILLEGAL_GENERATION). Today it's treated as abortable. BUT I think it should really be fatal since it's basically indicating a fenced situation. | 1) Wrapped as KE(e) | Abortable | Should be fatal. |
InvalidProducerEpochException | This error used to be returned from both txnal response and produce response, but as of KIP-588 (2.7+), we would not let txn coordinator to return InvalidProducerEpochException anymore, but only from partitions leaders on produce responses, also we treat this as ProducerFencedException at the client side, since only old versioned brokers would not return InvalidProducerEpochException now which should still be treated as fatal. HOWEVER, for TxnOffsetCommit (sent to the group coordinator) we did not do this conversion which is a bug — we should always convert to ProducerFenced. | 1), BUT not wrapped | Fatal if from txnal response (translated to ProducerFenced); Abortable if from produce response. | It's unclear why we wrap all other exceptions but leave these two un-wrapped; we should have a consistent wrapping mechanism. Plus, we should fix the bug for TxnOffsetCommit error handling. |
ProducerFencedException | This error used to be returned from both txnal response and produce response, but as of KIP-588 it should only be from txnal responses. It is a typical fatal error indicating that another producer with the same PID and newer epoch is in use. With KIP-447, producers from Kafka Streams should not be fenced by txn.id any more since we would fence them based on the GroupCoordinator instead; the actual case this would be thrown is usually when a txn is timed out (pending KIP-588 to be completed) | 1), BUT not wrapped | Fatal | It's unclear why we wrap all other exceptions but leave these two un-wrapped; we should have a consistent wrapping mechanism. |
OutOfOrderSequenceException | From produce response only, when the sequence does not match expected value | 1), Wrapped as KE(e) | Abortable (for idempotent producer we would handle it internally by bumping epoch) | See below |
InvalidTxnStateException | From txnal response, only, indicating the producer is issuing a request that it should not be. NOTE that we are handling this exception inconsistently: in endTxn it's wrapped as KE(e), in addPartitions it's wrapped as KE(KE(e)) | 1), Wrapped as either KE(e) or KE(KE(e))... | Fatal | Should fix the inconsistent wrapping. |
KafkaException | We definitely overloaded this one for various unrelated cases (which I think should be fixed): 1. when we failed to resolve those sequence-unresolved batches | 1), Wrapped as KE(KE) | After KIP-360 (2.5+), Abortable as we bump epoch; otherwise Fatal | Nested wrapping KafkaException(KafkaException(KafkaException..))) should be avoided. For this case I suggest we wrap as KE(OutOfSequenceException). |
2. when we are closing the producer, and hence need to garbage collect all pending txnal requests, we simply transit | 1), Wrapped as KE(KE) | Fatal | I don't think we should transit to error state at all for this case, and also shouldn't throw this exception either. | |
3. When a txnal response does not contain the "response()" field. | 1), Wrapped as KE(KE) | Fatal | Again, this should be an IllegalStateException since this should never happen. | |
4. All unexpected errors from txnal response | 1), Wrapped as KE(KE) | Fatal | Again, should not wrap it twice as KafkaException(KafkaException()). | |
5. When addPartition response returns with partition-level errors | 1), Wrapped as KE(KE(e)) | Abortable | Again should not wrap it twice as as KafkaException(KafkaException()). | |
RuntimeException | For any txnal requests, when request / response correlation id does not match | 1), Wrapped as KE(e) | Fatal | I think we should throw CorrelationIdMismatchException instead, which inherits from IllegalStateException, hence should not be wrapped either. |
Besides the detailed suggestions on each of the above category lines, there are also a few meta-level proposals:
...
This new exception type would be thrown back to the user only in the new producer API depicted in KIP-706.Stream side change
Streams Side Change
For EOS Kafka Streams case, we would adopt these simplified exception throwing logic by catching all exceptions in the data transmission phase to decide for Streams commit. Furthermore, these changes leave to door open for us to analyze the non-fatal exceptions thrown as well by unwrapping KafkaException's cause and reading failure type through callback.
More specifically:
For known exceptions such as ProducerFenced, the handling shall be simplified as we no longer need to wrap them as TaskMigratedException in the send callback, since they should not crash the stream thread if thrown in raw format, once we adopt the new processing model in the send phase.
...
- When handling lost-all-partitions, which would trigger when 1) the rebalance listener's onPartitionsLost are called, indicating the consumer member has been kicked out of the group, 2) a task-migration exception is thrown, we should not need to reset the producer by closing the current one and re-creating a new producer any more. Instead, we should still be able to reuse the same producer after we've re-joined the consumer group. Instead we just need to re-`initTxn` on the producer to make sure the previous dandling txns have been aborted before new transactions are about to start.
- We should distinguish exceptions thrown from the send() callback v.s. from the send() / commit() / etc call directly. With EOS, the only exception that would ONLY be thrown in the callback would be TransactionAbortedException, which we can actually ignore; and hence we would only need to just capture all exceptions thrown from the calls directly. That means we would handle exceptions differently between ALOS and EOS:
- ALOS: try to capture exceptions from the callback, handle them just as today.
- EOS: ignore exceptions from the callback, instead handle directly from the function calls.
Public Interfaces
As mentioned in the proposed changes section, we would be doing the following public API changes:
...
The failure type in the embed exception should be helpful, for example, they could choose to ignore transactional errors since other txn APIs are already taking care of them. We expect this error code to be implemented once KIP-706 is accepted, which would provide a more user-friendly send API with CompletableFuture or similar.
Documentation change
We shall put the newly marked fatal exceptions on the public Producer API docs correspondingly, including
...