Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This feature will directly make changes to the SinkTaskSinkTaskContext class with the addition of a new method.

Method

This KIP will add a setter method to the SinkTask class that will accept an getter method to the SinkTaskContext class that will return a  error reporter object. Sink connectors that wish to use an a error reporter can override call this  method to initialize a error reporter in method within their tasks. The

 The method and usage will look something like the following:

...

Code Block
public interface SinkTaskContext {
...    
	/**
    * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord}
 	* passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record,
 	* the sink task will receive a {@link Future} that the task can optionally use to wait until
 	* the failed record and exception have been written to Kafka via Connect's DLQ. Note that
 	* the result of this method may be null if this connector has not been configured with a DLQ.
 	*
 	* <p>This method was added in Apache Kafka 2.9. Sink tasks that use this method but want to
 	* maintain backward compatibility so they can also be deployed to older Connect runtimes
 	* should guard the call to this method with a try-catch block, since calling this method will result in a
 	* {@link NoSuchMethodException} when the sink connector is deployed to Connect runtimes
 	* older than Kafka 2.9. For example:
 	* <pre>
 	*     BiFunction<SinkTask, Throwable, Future<Void>> reporter;
 	*     try {
 	*         reporter = context.failedRecordReporter();
 	*     } catch (NoSuchMethodException e) {
 	*         reporter = null;
 	*     }
 	* </pre>
 	*
 	* @return the reporter function; null if no error reporter has been configured for the connector
 	* @since 2.9
 	*/
 	BiFunction<SinkTask, Throwable, Future<Void>> failedRecordReporter();
}


The usage will look like the following:

Code Block
private BiFunction<SinkTask, Throwable, Future<Void> reporter;

@Override
public void start(Map<String, String> props) {
  ...
  try {
    reporter = context.failedRecordReporter(); // may be null if DLQ not enabled
  } catch (NoSuchMethodException e) {
    // Will occur in Connect runtimes earlier than 2.9
    reporter = null;
  }
}

@Override
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      if (reporter != null) {
        // Send errant record to error reporter
        Future<Void> future = reporter.accept(record, e);
        // Optionally wait till the failure's been recorded in Kafka
        future.get();
      } else {
        // There's no error reporter, so fail
        throw new ConnectException("Failed on record", e);
      }
    }
  }
}


Proposed Changes

Error Reporter Object

Kafka Connect will create a BiConsumer<SinkRecord, Throwable> object to as the error reporter object. Along with the original errant sink record, the exception thrown will be sent to the error reporter to provide additional context. The only configurations exposed to the user and developer will be the following, taken from KIP-298:

...

As mentioned above, the error reporter will use the same configurations as the dead letter queue in KIP-298 to avoid redundant configuration. There will be no additional configurations for the Producer and AdminClient under the hood aside from the existing producer. and admin. configurations present in the worker configurations and producer.override. and admin.override. configurations present in the connector configurations. Serialization for the errant records will be done in the same manner as KIP-298.

Order of Operations

The order of operations will proceed like so:

...

Oconfigured to use an error reporter, the task can send errant records to it within put(...)

Synchrony

The error reporting functionality is intended to be synchronous. If a record is sent to the error reporter, processing of the next errant record in accept(...) will not begin until the producer successfully sends the errant record to Kafka. If the producer encounters an exception while sending the errant record to Kafka, the task will be killed; if the error reporting is failing as well, it does not make sense to keep running the connector. While this synchronous design will cause a throughput hit if many records are sent to the error reporter, a high number of errant records should not be a typical case in connector usage. Additionally, this throughput hit can provide a chance for the user to realize there is a high rate of errant records before many records are actually sent to the error reporter.

...