Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A connector consists of multiple stages. For source connectors, Connect retrieves the records from the connector, applies zero or more transformations, uses the converters to serialize each record’s key, value, and headers, and finally writes each record to Kafka. For sink connectors, Connect reads the topic(s), uses the converters to deserialize each record’s key, value, and headers, and for each record applies zero or more transformations and delivers the records to the sink connector. In this proposal, we will specifically deal with the following failure scenarios which can occur during these stages:

OperationWill Retry?Tolerated Exceptions
Transformation

only

on

org.apache.kafka.connect.errors.RetriableException

java.lang.Exception

Key, Value and Header Converter

only

on

org.apache.kafka.connect.errors.RetriableException

java.lang.Exception

Kafka Produce and Consume

only

on

org.apache.kafka.common.errors.RetriableException

java.lang.Exception
only on org.apache.kafka.connect.errors.RetriableException, fail task otherwise.


put() in SinkTask and poll() in SourceTask

only

on

org.apache.kafka.connect.errors.RetriableException

java.lang.Exception

only on org.apache.kafka.connect.errors.RetriableException, fail task otherwise.

There are two behavioral changes introduced by this KIP. First, a failure in any stage will be reattempted, thereby “blocking” the connector. This helps in situations where time is needed to manually update an external system, such as manually correcting a schema in the Schema Registry. More complex causes, such as requiring code changes or corruptions in the data that can’t be fixed externally, will require the worker to be stopped, data to be fixed and then the Connector to be restarted. In the case of data corruption, the topic might need to be cleaned up too. If the retry limit for a failure is reached, then the tolerance limit is used to determine if this record should be skipped, or if the task is to be killed. The second behavioral change is introduced in how we report these failures. Currently, only the exception which kills the task is written with the application logs. With the additions presented in this KIP, we are logging details about the failed operation along with the bad record. We are also introducing an option to write bad records into a dead letter queue for Sink Connectors. This would write the original key, value and headers of failed records into a configured Kafka topic. 

...