Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
private ErrantRecordReporter reporter;

@Override
public void start(Map<String, String> props) {
  ...
  try {
    reporter = context.failedRecordReportererrantRecordReporter(); // may be null if DLQ not enabled
  } catch (NoSuchMethodException | NoClassDefFoundError e) {
    // Will occur in Connect runtimes earlier than 2.6
    reporter = null;
  }
}

@Override
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      if (reporter != null) {
        // Send errant record to error reporter
        Future<Void> future = reporter.acceptreport(record, e);
        // Optionally wait till the failure's been recorded in Kafka
        future.get();
      } else {
        // There's no error reporter, so fail
        throw new ConnectException("Failed on record", e);
      }
    }
  }
}

...