...
Connect should allow users to configure how bad data failures should be handled during all phases of processing records. Some failures such as the lack of availability of some external components may be resolved by simply retrying, while other errors should be recorded and the problem records simply skipped. Where possible, Connect should be able to record the error and optionally include the problematic records and configuration states of the connector, transform and transformconverter. And since Since no single solution works for everyone, all of this error handling behavior should be configurable.
...
Config Option | Description | Default Value | Domain |
---|---|---|---|
errors.dlq.enable | Write the error context along into a Kafka topic | false | Boolean |
errors.dlq.topic.name | The name of the topic where these messages are written to | "" | A valid Kafka topic name |
errors.dlq.topic.partitions | Number of partitions for this topic | 5 | [1, 2 .. Integer.MAX_INT] |
errors.dlq.topic.replication.factor | The replication factor for this topic | 3 | [1, 2 .. Integer.MAX_INT] |
errors.dlq.include.configs | Include the (worker, connector) configs in the log. | false | Boolean |
errors.dlq.include.messages | Include the Connect Record which failed to process in the log. | false | Boolean |
errors.dlq.producer.* | Config for the KafkaProduce to produce to this topic (for example: errors.dlq.producer.bootstrap.servers will set the bootstrap servers of the Kafka cluster). | ||
errors.dlq.converter | Converter class used to convert the error context between Kafka Connect format and the serialized form that is written to Kafka. | org.apache.kafka. connect.json.JsonConverter | Any class which implements org.apache.kafka.connect.storage.Converter |
...
While logging the error context, it might be worthwhile to take precautions to hide sensitive content. For example, some of the configs might contain sensitive information such as usernames or passwords. To prevent logging critical information, we provide configuration options to disable logging the messages (errors.dlq.include.messages
) and /or configs (errors.dlq.include.configs
).
...
Code Block | ||
---|---|---|
| ||
# retry at most 100 times waiting up to 5 minutes between consecutive failures errors.retries.limit=100 errors.retries.delay.max.ms=300000 # log error context along with application logs, but do not include configs and messages errors.log.enable=true errors.log.include.configs=false errors.log.include.messages=false # produce error context into the Kafka topic errors.dlq.enable=true errors.dlq.topic.name=my-connector-errors errors.dlq.topic.partitions=25 errors.dlq.topic.replication.factor=3 errors.dlq.include.configs=true errors.dlq.include.messages=true # ToleranceTolerate all errors. errors.tolerance.limit=-1 errors.tolerance.rate.limit=-1 errors.tolerance.rate.duration.ms=60000 |
...
Code Block | ||
---|---|---|
| ||
# retry up to 100 times waiting up to 5 minutes between consecutive failures errors.retries.limit=100 errors.retries.delay.max.ms=300000 # log error context along with log4japplication appenderlogs, but do not include configs and messages errors.log.enable=true errors.log.include.configs=false errors.log.include.messages=false # produce error context into a secure Kafka topic errors.dlq.enable=true errors.dlq.topic.name=my-connector-errors errors.dlq.topic.partitions=25 errors.dlq.topic.replication.factor=3 errors.dlq.include.configs=true errors.dlq.include.messages=true errors.dlq.producer.bootstrap.servers=secure-broker:9093 errors.dlq.producer.acks = 0 errors.dlq.producer.security.protocol=SSL errors.dlq.producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks errors.dlq.producer.ssl.truststore.password=test1234 errors.dlq.producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks errors.dlq.producer.ssl.keystore.password=test1234 errors.dlq.producer.ssl.key.password=test1234 # ToleranceTolerate all errors. errors.tolerance.limit=-1 errors.tolerance.rate.limit=-1 errors.tolerance.rate.duration.ms=60000 |
...
Interceptors for Erroneous Records: Similar to ProducerInterceptor and ConsumerInterceptor, we could potentially add ErrorInterceptors too. But given that the ErrorHandler handlers subsumes most of the functionalities here, we decided to not provide this feature.
...