Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public interface SinkTaskContext {
...    
	/**
    * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord}
 	* passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record,
 	* the sink task will receive a {@link Future} that the task can optionally use to wait until
 	* the failed record and exception have been written to Kafka via Connect's DLQ. Note that
 	* the result of this method may be null if this connector has not been configured with a DLQ.
 	*
 	* <p>This method was added in Apache Kafka 2.6. Sink tasks that use this method but want to
 	* maintain backward compatibility so they can also be deployed to older Connect runtimes
 	* should guard the call to this method with a try-catch block, since calling this method will result in a
 	* {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes
 	* Connect runtimes older than Kafka 2.6. For example:
 	* <pre>
 	*     BiFunction<SinkTask, Throwable, Future<Void>> ErrantRecordReporter reporter;
 	*     try {
 	*         reporter = context.failedRecordReporter();
 	*     } catch (NoSuchMethodError | NoClassDefFoundError e) {
 	*         reporter = null;
 	*     }
 	* </pre>
 	*
 	* @return the reporter function; null if no error reporter has been configured for the connector
 	* @since 2.6
 	*/
 	BiFunction<SinkTask, Throwable, Future<Void>> ErrantRecordReporter failedRecordReporter();
}

...

The usage will look like the following:

Code Block
private BiFunction<SinkTask, Throwable, Future<Void> ErrantRecordReporter reporter;

@Override
public void start(Map<String, String> props) {
  ...
  try {
    reporter = context.failedRecordReporter(); // may be null if DLQ not enabled
  } catch (NoSuchMethodException | NoClassDefFoundError e) {
    // Will occur in Connect runtimes earlier than 2.6
    reporter = null;
  }
}

@Override
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      if (reporter != null) {
        // Send errant record to error reporter
        Future<Void> future = reporter.accept(record, e);
        // Optionally wait till the failure's been recorded in Kafka
        future.get();
      } else {
        // There's no error reporter, so fail
        throw new ConnectException("Failed on record", e);
      }
    }
  }
}

...

Code Block
/**
 * Component that the sink task can use as it {@link SinkTask#put(Collection<SinkRecord>)}
 * Reporter of problematic records and the corresponding problems.
 *
 * @since 2.6
 */
public interface ErrantRecordReporter {

    /**
     * Report a problematic record and the corresponding error to be written to the sink
     * connector's dead letter queue (DLQ).
     *
     * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future} for the
     * {@link RecordMetadata} that will be assigned to the record in the DLQ topic. Invoking
     * {@link java.util.concurrent.Future#get() get()} on this future will block until the
     * record has been written and then return the metadata for the record
     * or throw any exception that occurred while sending the record.
     * If you want to simulate a simple blocking call you can call the <code>get()</code> method
     * immediately.
     *
     * @param record the problematic record; may not be null
     * @param error  the error capturing the problem with the record; may not be null
     * @return a future that can be used to block until the record and error are written
     *         to the DLQ
     * @since 2.6
     */
    default Future<RecordMetadata> report(SinkRecord record, Throwable error) {
        return report(record, error, null);
    }
}


Proposed Changes

Reporting

...