You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Status

Current stateUnder Discussion

Discussion thread: Here

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, KIP-298 provides error handling in Kafka Connect that includes functionality such as retrying, logging, and sending errant records to a dead letter queue. However, the dead letter queue functionality from KIP-298 only supports error reporting within contexts of the transform operation, and key, value, and header converter operation. After records are sent to the connector for processing, there is no support for dead letter queue/error reporting functionality. 

As stated in rejected alternatives, Write records that fail in the put() step of a sink connector to the dead letter queue: since sink connectors can chose to batch records in a put() method, it is not clear what errors are caused by what records (they might be because of records that were immediately written to put(), or by some previous records that were processed later). Also, there might be connection issues that are not handled by the connector, and simply bubbled up as IOException (for example). Effectively, errors sent back to the framework from the put() method currently do not have sufficient context to determine the problematic records (if any). Addressing these issues would need a separate KIP. “

Thus, this proposal aims to extend KIP-298 and add error reporting functionality even after records are sent to connector tasks without adding redundancy in configuration.

Public Interfaces

This feature will directly make changes to the SinkTask class with the addition of a new method.

Method

This KIP will add a setter method to the SinkTask class that will accept an error reporter object. Sink connectors that wish to use an error reporter can override this method to initialize a error reporter in their tasks. The method and usage will look something like the following:

private BiConsumer<SinkRecord, Throwable> errorReporter = null;
 
/**
 * Invoked by the framework to supply the task with an error reporter, 
 * if the user has configured one for this connector.
 

 * <p>Tasks can send problematic records back to the framework by 
 * invoking {@link accept(SinkRecord record, Throwable e)}.
 *
 * @param reporter The error reporter to send errant records to.
 */
public void errantRecordReporter(BiConsumer<SinkRecord, Throwable> reporter) {
  // set the task's error reporter to the passed reporter
  this.errorReporter = reporter;
}
 
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      // send errant record to error reporter
      errorReporter.accept(record, e);
    }
  }
}

Proposed Changes

Error Reporter Object

Kafka Connect will create a BiConsumer<SinkRecord, Throwable> object to as the error reporter object. Along with the original errant sink record, the exception thrown will be sent to the error reporter to provide additional context. The only configurations exposed to the user and developer will be the following, taken from KIP-298:

Config Option

Description

Default Value

Domain

errors.deadletterqueue.context.headers.enableIf true, multiple headers will be added to annotate the record with the error contextfalseBoolean
errors.deadletterqueue.topic.nameThe name of the dead letter queue topic. If not set, this feature will be disabled.""A valid Kafka topic name
errors.deadletterqueue.topic.replication.factorReplication factor used to create the dead letter queue topic when it doesn't already exist.3[1 ... Short.MAX_VALUE]
errors.log.enableLog the error context along with the other application logs. This context includes details about the failed operation, and the record which caused the failure.falseBoolean
errors.log.include.messagesWhether to include the Connect Record in every log. This is useful if users do not want records to be written to log files because they contain sensitive information, or are simply very large. If this property is disabled, Connect will still log some minimal information about the record (for example, the source partition and offset if it is a SourceRecord, and Kafka topic and offset if it is a SinkRecord).falseBoolean
errors.tolerance
Fail the task if we exceed specified number of errors in the observed duration.none[none, all].

As mentioned above, the error reporter will use the same configurations as the dead letter queue in KIP-298 to avoid redundant configuration. There will be no additional configurations for the Producer and AdminClient under the hood aside from the existing producer. and admin. configurations present in the worker configurations and producer.override. and admin.override. configurations present in the connector configurations. Serialization for the errant records will be done in the same manner as KIP-298.

Order of Operations

The order of operations will proceed like so:

  1. The framework calls errantRecordReporter(...)
  2. The framework calls task.start(...)
  3. If the error reporter is not null and the connector is configured to use an error reporter, the task can send errant records to it within put(...)

Synchrony

The error reporting functionality is intended to be synchronous. If a record is sent to the error reporter, processing of the next errant record in accept(...) will not begin until the producer successfully sends the errant record to Kafka. If the producer encounters an exception while sending the errant record to Kafka, the task will be killed; if the error reporting is failing as well, it does not make sense to keep running the connector. While this synchronous design will cause a throughput hit if many records are sent to the error reporter, a high number of errant records should not be a typical case in connector usage. Additionally, this throughput hit can provide a chance for the user to realize there is a high rate of errant records before many records are actually sent to the error reporter.

Metrics

The following metrics will used to monitor the error reporter:

  • number of records sent to error reporter
  • producer retry in sending errant record to Kafka

Compatibility, Deprecation, and Migration Plan

Backwards Compatibility

A major driving force and main advantage behind this design is its backwards compatibility. The idea is for the addition of the error reporter to be as seamless as possible running on older versions of Kafka Connect. In the case that a user wants to deploy a connector on an older version of Kafka Connect, errantRecordReporter(...) will act as a NOP; thus, the connector's error reporter variable will remain null and the task can proceed accordingly. Additionally, to ensure that users can deploy connectors on any version of Kafka Connect, a Java standard library interface, BiConsumer, was used to implement the error reporter rather than creating a new interface specific for this functionality. 

Rejected Alternatives

  1. New library or interface: creating a separate library like connect-reporter will cause redundancy in configuration, and creating any new interface or API will limit backwards compatibility. Deploying a connector on an old version of connect will not be a seamless integration with the introduction of a new interface
  2. Exposing ErrorReporter and ProcessingContext as public APIs: the main issue with this design is that the the API exposes packages with runtime in them and package names cannot be changed.
  3. Labeling as a dead letter queue
  4. Asynchronous error reporting: this introduces multiple considerations along each step of the error reporting, which complicates the design and implementation; the increase in throughput doesn't outweigh the added complication
  5. Batch error reporting: this would introduce the need for keeping track of order, and the increase in throughput doesn't seem to outweigh the added complication
  • No labels