Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, KIP-298 provides error handling in Kafka Connect that includes functionality such as retrying, logging, and sending errant records to a dead letter queue. However, the dead letter queue functionality from KIP-298 only supports error reporting within contexts of the transform operation, and key, value, and header converter operation. Within the context of the put(...) method in sink connectorsAfter records are sent to the connector for processing, there is no support for dead letter queue/error reporting functionality. 

...

Thus, this proposal aims to extend KIP-298 and add error reporting functionality within the context of put(...) even after records are sent to connector tasks without adding redundancy in configuration.

...

private BiConsumer<SinkRecord, Throwable> errorReporter = null;
 
/**
 * Set the error reporter for this task Invoked by the framework to supply the task with an error reporter, 
 * if the user has configured one for this connector.
 * 

 * <p>The general usage pattern for this error reporter is to use this method to set it,
 * and invoke its <p>Tasks can send problematic records back to the framework by 
 * invoking {@link accept(SinkRecord record, Throwable e)} method when an
 * exception is thrown while processing a record in {@link put(Collection<SinkRecord> records)}
 * to send this errant record to the error reporter.
 *
 * @param reporter The error reporter to send errant records to.
 */
public void errantRecordReporter(BiConsumer<SinkRecord, Throwable> reporter) {
  // set the task's error reporter to the passed reporter
  this.errorReporter = reporter;
}
 
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      // send errant record to error reporter
      errorReporter.accept(record, e);
    }
  }
}

...

Config Option

Description

Default Value

Domain

errors.deadletterqueue.context.headers.enableIf true, multiple headers will be added to annotate the record with the error contextfalseBoolean
errors.deadletterqueue.topic.nameThe name of the dead letter queue topic. If not set, this feature will be disabled.""A valid Kafka topic name
errors.deadletterqueue.topic.replication.factorReplication factor used to create the dead letter queue topic when it doesn't already exist.3[1 ... Short.MAX_VALUE]
errors.log.enableLog the error context along with the other application logs. This context includes details about the failed operation, and the record which caused the failure.falseBoolean
errors.log.include.messagesWhether to include the Connect Record in every log. This is useful if users do not want records to be written to log files because they contain sensitive information, or are simply very large. If this property is disabled, Connect will still log some minimal information about the record (for example, the source partition and offset if it is a SourceRecord, and Kafka topic and offset if it is a SinkRecord).falseBoolean
errors.tolerance
Fail the task if we exceed specified number of errors in the observed duration.none[none, all].

As mentioned above, the error reporter will use the same configurations as the dead letter queue in KIP-298 to avoid redundant configuration. There will be no additional configurations for the Producer and AdminClient under the hood aside from the existing producer. and admin. configurations present in the worker configurations and producer.override. and admin.override. configurations present in the connector configurations. Serialization for the errant records will be done in the same manner as KIP-298.

...

  1. The framework calls errantRecordReporter(...)
  2. The framework calls task calls .start(...)
  3. If the error reporter is not null and the connector is configured to use an error reporter, the task can send errant records to it within put(...)

...