Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Connect should allow users to configure how bad data failures should be handled during all phases of processing records. Some failures such as the lack of availability of some external components may be resolved by simply retrying, while other errors should be recorded and the problem records simply skipped. Where possible, Connect should be able to record the error and optionally include the problematic records and configuration states of the connector, transform and transformconverter. And since Since no single solution works for everyone, all of this error handling behavior should be configurable.

...

Config OptionDescriptionDefault ValueDomain
errors.dlq.enableWrite the error context along into a Kafka topicfalseBoolean
errors.dlq.topic.nameThe name of the topic where these messages are written to""A valid Kafka topic name
errors.dlq.topic.partitionsNumber of partitions for this topic5[1, 2 .. Integer.MAX_INT]
errors.dlq.topic.replication.factorThe replication factor for this topic3[1, 2 .. Integer.MAX_INT]
errors.dlq.include.configsInclude the (worker, connector) configs in the log.falseBoolean
errors.dlq.include.messagesInclude the Connect Record which failed to process in the log.falseBoolean
errors.dlq.producer.*Config for the KafkaProduce to produce to this topic (for example: errors.dlq.producer.bootstrap.servers will set the bootstrap servers of the Kafka cluster).  
errors.dlq.converterConverter class used to convert the error context between Kafka Connect format and the serialized form that is written to Kafka.org.apache.kafka.
connect.json.JsonConverter
Any class which implements  org.apache.kafka.connect.storage.Converter

...

While logging the error context, it might be worthwhile to take precautions to hide sensitive content. For example, some of the configs might contain sensitive information such as usernames or passwords. To prevent logging critical information, we provide configuration options to disable logging the messages (errors.dlq.include.messages) and /or configs (errors.dlq.include.configs).

...

Code Block
languagejava
# retry at most 100 times waiting up to 5 minutes between consecutive failures
errors.retries.limit=100
errors.retries.delay.max.ms=300000

# log error context along with application logs, but do not include configs and messages
errors.log.enable=true
errors.log.include.configs=false
errors.log.include.messages=false

# produce error context into the Kafka topic
errors.dlq.enable=true
errors.dlq.topic.name=my-connector-errors
errors.dlq.topic.partitions=25
errors.dlq.topic.replication.factor=3
errors.dlq.include.configs=true
errors.dlq.include.messages=true

# ToleranceTolerate all errors.
errors.tolerance.limit=-1
errors.tolerance.rate.limit=-1
errors.tolerance.rate.duration.ms=60000

...

Code Block
languagejava
# retry up to 100 times waiting up to 5 minutes between consecutive failures
errors.retries.limit=100
errors.retries.delay.max.ms=300000

# log error context along with log4japplication appenderlogs, but do not include configs and messages
errors.log.enable=true
errors.log.include.configs=false
errors.log.include.messages=false

# produce error context into a secure Kafka topic
errors.dlq.enable=true
errors.dlq.topic.name=my-connector-errors
errors.dlq.topic.partitions=25
errors.dlq.topic.replication.factor=3
errors.dlq.include.configs=true
errors.dlq.include.messages=true
errors.dlq.producer.bootstrap.servers=secure-broker:9093
errors.dlq.producer.acks = 0
errors.dlq.producer.security.protocol=SSL
errors.dlq.producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
errors.dlq.producer.ssl.truststore.password=test1234
errors.dlq.producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
errors.dlq.producer.ssl.keystore.password=test1234
errors.dlq.producer.ssl.key.password=test1234

# ToleranceTolerate all errors.
errors.tolerance.limit=-1
errors.tolerance.rate.limit=-1
errors.tolerance.rate.duration.ms=60000

...

Interceptors for Erroneous Records: Similar to ProducerInterceptor and ConsumerInterceptor, we could potentially add ErrorInterceptors too. But given that the ErrorHandler handlers subsumes most of the functionalities here, we decided to not provide this feature.

...