Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public interface SinkTaskContext {
...    
	/**
    * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord}
 	* passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record,
 	* the sink task will receive a {@link Future} that the task can optionally use to wait until
 	* the failed record and exception have been written to Kafka via Connect's DLQ. Note that
 	* the result of this method may be null if this connector has not been configured with a DLQ.
 	*
 	* <p>This method was added in Apache Kafka 2.96. Sink tasks that use this method but want to
 	* maintain backward compatibility so they can also be deployed to older Connect runtimes
 	* should guard the call to this method with a try-catch block, since calling this method will result in a
 	* {@link NoSuchMethodException} when the sink connector is deployed to Connect runtimes
 	* older than Kafka 2.96. For example:
 	* <pre>
 	*     BiFunction<SinkTask, Throwable, Future<Void>> reporter;
 	*     try {
 	*         reporter = context.failedRecordReporter();
 	*     } catch (NoSuchMethodExceptionNoSuchMethodError | NoClassDefFoundError e) {
 	*         reporter = null;
 	*     }
 	* </pre>
 	*
 	* @return the reporter function; null if no error reporter has been configured for the connector
 	* @since 2.96
 	*/
 	BiFunction<SinkTask, Throwable, Future<Void>> failedRecordReporter();
}

...

Code Block
private BiFunction<SinkTask, Throwable, Future<Void> reporter;

@Override
public void start(Map<String, String> props) {
  ...
  try {
    reporter = context.failedRecordReporter(); // may be null if DLQ not enabled
  } catch (NoSuchMethodException | NoClassDefFoundError e) {
    // Will occur in Connect runtimes earlier than 2.96
    reporter = null;
  }
}

@Override
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      if (reporter != null) {
        // Send errant record to error reporter
        Future<Void> future = reporter.accept(record, e);
        // Optionally wait till the failure's been recorded in Kafka
        future.get();
      } else {
        // There's no error reporter, so fail
        throw new ConnectException("Failed on record", e);
      }
    }
  }
}

...

Code Block
/**
 * Component that the sink task can use as it {@link SinkTask#put(Collection<SinkRecord>)}
 * Reporter of problematic records and the corresponding problems.
 *
 * @since 2.96
 */
public interface ErrantRecordReporter {

    /**
     * Report a problematic record and the corresponding error to be written to the sink
     * connector's dead letter queue (DLQ).
     *
     * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future} for the
     * {@link RecordMetadata} that will be assigned to the record in the DLQ topic. Invoking
     * {@link java.util.concurrent.Future#get() get()} on this future will block until the
     * record has been written and then return the metadata for the record
     * or throw any exception that occurred while sending the record.
     * If you want to simulate a simple blocking call you can call the <code>get()</code> method
     * immediately.
     *
     * @param record the problematic record; may not be null
     * @param error  the error capturing the problem with the record; may not be null
     * @return a future that can be used to block until the record and error are written
     *         to the DLQ
     * @since 2.96
     */
    default Future<RecordMetadata> report(SinkRecord record, Throwable error) {
        return report(record, error, null);
    }
}

...

The error reporter interface will have one asynchronous reporting method. The method will accept the errant record and the exception thrown while processing the errant record, and return a Future<RecordMetadata>. In order to handle the case where the connector is deployed to an older version of AK, the developer must catch the resulting NoClassDefError or NoSuchMethodError and set the reporter to null. Thus, when processing errant records, then the developer check if the reporter is null; if it is, then the developer should wrap the original exception in a ConnectException and throw it. Records passed to the error reporter should be considered processed with respect to handling offsets, for example within the context of processing current offsets in preCommit(...). 

The error reporter will use the same configurations as the dead letter queue in KIP-298 to avoid redundant configuration. There will be no additional configurations for the Producer and AdminClient under the hood aside from the existing producer. and admin. configurations present in the worker configurations and producer.override. and admin.override. configurations present in the connector configurations. Serialization for the errant records will be done in the same manner as KIP-298.

...

The error reporting functionality is designed to be asynchronous but can be either asynchronous or made synchronous if desired. By default, error reporting will be asynchronous; processing of the subsequent errant record will not be blocked by the successful processing of the prior errant record. However, if the developer prefers synchronous functionality, they can block processing of the next record with future.get(). In order to avoid error records being written out of order (for example, due to retries), the developer should always use max.in.flight.requests.per.connection=1 in their implementation for writing error records. If the developer determines that order is not important and they want extreme performance, they can always increase this number.

...

This proposal is backward compatible such that existing sink connector implementations will continue to work as before. Developers can optionally modify sink connector implementations to use the new error reporting feature, yet still easily support installing and running those connectors in older Connect runtimes where the feature does not exist.

With respect to the new methodMoreover, to ensure that new connectors using this new method and interface can still be deployed on older versions of Kafka Connect, the developer can should use a try catch block to catch the NoSuchMethodException NoSuchMethodError or NoClassDefFoundError thrown by worker with an older version of AK.

...