Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAdopted

Discussion thread: Here

Vote thread: Here

...

This KIP will add a getter method to the SinkTaskContext class that will return a  error reporter object, and by default this method will return null. Sink connectors that wish to use a an error reporter can call this method within their tasks.

...

Code Block
public interface SinkTaskContext {
...    
	/**
    * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord}
 	* passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record,
 	* the sink task will receive a {@link Future} that the task can optionally use to wait until
 	* the failed record and exception have been written to Kafka via Connect's DLQ. Note that
 	* the result of this method may be null if this connector has not been configured with a DLQ.
 	*
 	* <p>This method was added in Apache Kafka 2.6. Sink tasks that use this method but want to
 	* maintain backward compatibility so they can also be deployed to older Connect runtimes
 	* should guard the call to this method with a try-catch block, since calling this method will result in a
 	* {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the sink connector is deployed to 
 	* Connect runtimes older than Kafka 2.6. For example:
 	* <pre>
 	*     ErrantRecordReporter reporter;
 	*     try {
 	*         reporter = context.failedRecordReporter();
 	*     } catch (NoSuchMethodError | NoClassDefFoundError e) {
 	*         reporter = null;
 	*     }
 	* </pre>
 	*
 	* @return the reporter function; null if no error reporter has been configured for the connector
 	* @since 2.6
 	*/
 	default ErrantRecordReporter failedRecordReportererrantRecordReporter() {
        return null;
    }
}

Interface

This KIP will add an ErrantRecordReporter interface and will contain one method, report(SinkRecord record, Throwable error).

...

Code Block
private ErrantRecordReporter reporter;

@Override
public void start(Map<String, String> props) {
  ...
  try {
    reporter = context.failedRecordReportererrantRecordReporter(); // may be null if DLQ not enabled
  } catch (NoSuchMethodException | NoClassDefFoundError e) {
    // Will occur in Connect runtimes earlier than 2.6
    reporter = null;
  }
}

@Override
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      if (reporter != null) {
        // Send errant record to error reporter
        Future<Void> future = reporter.acceptreport(record, e);
        // Optionally wait till the failure's been recorded in Kafka
        future.get();
      } else {
        // There's no error reporter, so fail
        throw new ConnectException("Failed on record", e);
      }
    }
  }
}

...

The error reporting functionality is asynchronous, although tasks can use the resulting future to wait for the record and exception to be written to Kafka.

Guarantees

The Connect framework also guarantees that by the time preCommit(...) is called on the task, the error reporter will have successfully and fully recorded all reported records with offsets at or before those passed to the preCommit method. Sink task implementations that need more strict guarantees can use the futures returned by report(...) to wait for confirmation that reported records have been successfully recorded.

Metrics

No new metrics will be added for the error reporter. Instead, the metrics detailed in KIP-298 for the dead letter queue will be used for the error reporter.

Errors Tolerance

The Errant Record Reporter will adhere to the existing DLQ error tolerance functionality. For example, if `errors.tolerance` is set to `ALL`, all errors will be tolerated; if the property is set to `NONE`, then the Errant Record Reporter will throw an exception detailing that the tolerance has been exceeded. Even if the developer chooses to catch and swallow any of these exceptions thrown during `report(...)`, the task is guaranteed to be killed following the completion of `put(...)`.

FileSinkTask Example

This KIP will update the FileSinkTask example to use the new API, to demonstrate proper error handling with the ability to use it on older versions of Kafka Connect.

...