Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
private ErrantRecordReporter reporter;

@Override
public void start(Map<String, String> props) {
  ...
  try {
    reporter = context.failedRecordReportererrantRecordReporter(); // may be null if DLQ not enabled
  } catch (NoSuchMethodException | NoClassDefFoundError e) {
    // Will occur in Connect runtimes earlier than 2.6
    reporter = null;
  }
}

@Override
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      if (reporter != null) {
        // Send errant record to error reporter
        Future<Void> future = reporter.acceptreport(record, e);
        // Optionally wait till the failure's been recorded in Kafka
        future.get();
      } else {
        // There's no error reporter, so fail
        throw new ConnectException("Failed on record", e);
      }
    }
  }
}

...

No new metrics will be added for the error reporter. Instead, the metrics detailed in KIP-298 for the dead letter queue will be used for the error reporter.

Errors Tolerance

The Errant Record Reporter will adhere to the existing DLQ error tolerance functionality. For example, if `errors.tolerance` is set to `ALL`, all errors will be tolerated; if the property is set to `NONE`, then the Errant Record Reporter will throw an exception detailing that the tolerance has been exceeded. Even if the developer chooses to catch and swallow any of these exceptions thrown during `report(...)`, the task is guaranteed to be killed following the completion of `put(...)`.

FileSinkTask Example

This KIP will update the FileSinkTask example to use the new API, to demonstrate proper error handling with the ability to use it on older versions of Kafka Connect.

...