Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateAdopted

Under Discussion thread: Here

Discussion Vote thread: Here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9971

...

This feature will directly make changes to the SinkTaskContext class with the addition of a new method and a new interface.

Method

This KIP will add a getter method to the SinkTaskContext class that will return a  error reporter object, and by default this method will return null. Sink connectors that wish to use a an error reporter can call this method within their tasks.

 The The method will look the following:

Code Block
public interface SinkTaskContext {
...    
	/**
    * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord}
 	* passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record,
 	* the sink task will receive a {@link Future} that the task can optionally use to wait until
 	* the failed record and exception have been written to Kafka via Connect's DLQ. Note that
 	* the result of this method may be null if this connector has not been configured with a DLQ.
 	*
 	* <p>This method was added in Apache Kafka 2.96. Sink tasks that use this method but want to
 	* maintain backward compatibility so they can also be deployed to older Connect runtimes
 	* should guard the call to this method with a try-catch block, since calling this method will result in a
 	* {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes
 	* Connect runtimes older than Kafka 2.96. For example:
 	* <pre>
 	*     BiFunction<SinkTask, Throwable, Future<Void>>ErrantRecordReporter reporter;
 	*     try {
 	*         reporter = context.failedRecordReporter();
 	*     } catch (NoSuchMethodExceptionNoSuchMethodError | NoClassDefFoundError e) {
 	*         reporter = null;
 	*     }
 	* </pre>
 	*
 	* @return the reporter function; null if no error reporter has been configured for the connector
 	* @since 2.96
 	*/
 	BiFunction<SinkTask,default Throwable, Future<Void>> failedRecordReporter();
}ErrantRecordReporter errantRecordReporter() {
        return null;
    }
}

Interface

This KIP will add an ErrantRecordReporter interface and will contain one method, report(SinkRecord record, Throwable error).

The interface The usage will look like the following:

Code Block
/**
 * Component that the sink task can use as it {@link SinkTask#put(Collection<SinkRecord>)}
 * Reporter of problematic records and the corresponding problems.
 *
 * @since 2.6
 */
public interface ErrantRecordReporter {

    /**
     * Report a problematic record and the corresponding error to be written to the sink
     * connector's dead letter queue (DLQ).
     *
     * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future}. 
     * Invoking {@link java.util.concurrent.Future#get() get()} on this future will block until the
     * record has been written and then return the metadata for the record
     * or throw any exception that occurred while sending the record.
     * If you want to simulate a simple blocking call you can call the <code>get()</code> method
     * immediately.
     *
     * @param record the problematic record; may not be null
     * @param error  the error capturing the problem with the record; may not be null
     * @return a future that can be used to block until the record and error are written
     *         to the DLQ
     * @since 2.6
     */
    Future<Void> report(SinkRecord record, Throwable error);
}

Proposed Changes

Reporting

The error reporter interface will have one asynchronous reporting method. The method will accept the errant record and the exception thrown while processing the errant record, and return a Future<RecordMetadata>. In order to handle the case where the connector is deployed to an older version of AK, when calling the method failedRecordReporter() to get the error reporter, the developer must catch the resulting NoClassDefError or NoSuchMethodError and set the reporter to null. Thus, when processing errant records, the developer should check if the reporter is null; if it is, then the developer should wrap the original exception in a ConnectException and throw it. Records passed to the error reporter should be considered processed with respect to handling offsets, for example within the context of processing current offsets in preCommit(...)

The error reporter will use the same configurations as the dead letter queue in KIP-298 to avoid redundant configuration. There will be no additional configurations for the Producer and AdminClient under the hood aside from the existing producer. and admin. configurations present in the worker configurations and producer.override. and admin.override. configurations present in the connector configurations. Serialization for the errant records will be done in the same manner as KIP-298.

Example Usage

The following is an example of how a sink task can use the error reporter and support connectors being deployed in earlier versions of the Connect runtime:

Code Block
private ErrantRecordReporterprivate BiFunction<SinkTask, Throwable, Future<Void> reporter;

@Override
public void start(Map<String, String> props) {
  ...
  try {
    reporter = context.failedRecordReportererrantRecordReporter(); // may be null if DLQ not enabled
  } catch (NoSuchMethodException | NoClassDefFoundError e) {
    // Will occur in Connect runtimes earlier than 2.96
    reporter = null;
  }
}

@Override
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      if (reporter != null) {
        // Send errant record to error reporter
        Future<Void> future = reporter.acceptreport(record, e);
        // Optionally wait till the failure's been recorded in Kafka
        future.get();
      } else {
        // There's no error reporter, so fail
        throw new ConnectException("Failed on record", e);
      }
    }
  }
}

Proposed Changes

Error Reporter Object

The SinkTaskContext class will create a BiFunction<SinkRecord, Throwable, Future<Void>> object as the error reporter object. Along with the original errant sink record, the exception thrown will be sent to the error reporter to provide additional context. For asynchronous functionality, the error reporter will return a Future for each record processed. The error reporter will use the same configurations as the dead letter queue in KIP-298 to avoid redundant configuration. There will be no additional configurations for the Producer and AdminClient under the hood aside from the existing producer. and admin. configurations present in the worker configurations and producer.override. and admin.override. configurations present in the connector configurations. Serialization for the errant records will be done in the same manner as KIP-298.

Synchrony

The error reporting functionality can be either asynchronous or synchronous. By default, error reporting will be asynchronous; processing of the subsequent errant record will not be blocked by the successful processing of the prior errant record. However, if the developer prefers synchronous functionality, they can block processing of the next record with future.get(). In order to avoid error records being written out of order (for example, due to retries), the developer can use max.in.flight.requests.per.connection=1 in their implementation for writing error records.

Additionally, If the producer encounters an exception while sending the errant record to Kafka, the task will be killed; if the error reporting is failing as well, it does not make sense to keep running the connector. 

Metrics

The following metrics will used to monitor the error reporter:

...

Synchrony

The error reporting functionality is asynchronous, although tasks can use the resulting future to wait for the record and exception to be written to Kafka.

Guarantees

The Connect framework also guarantees that by the time preCommit(...) is called on the task, the error reporter will have successfully and fully recorded all reported records with offsets at or before those passed to the preCommit method. Sink task implementations that need more strict guarantees can use the futures returned by report(...) to wait for confirmation that reported records have been successfully recorded.

Metrics

No new metrics will be added for the error reporter. Instead, the metrics detailed in KIP-298 for the dead letter queue will be used for the error reporter.

Errors Tolerance

The Errant Record Reporter will adhere to the existing DLQ error tolerance functionality. For example, if `errors.tolerance` is set to `ALL`, all errors will be tolerated; if the property is set to `NONE`, then the Errant Record Reporter will throw an exception detailing that the tolerance has been exceeded. Even if the developer chooses to catch and swallow any of these exceptions thrown during `report(...)`, the task is guaranteed to be killed following the completion of `put(...)`.

FileSinkTask Example

This KIP will update the FileSinkTask example to use the new API, to demonstrate proper error handling with the ability to use it on older versions of Kafka Connect.

Compatibility, Deprecation, and Migration Plan

Backwards Compatibility

This proposal is backward compatible such that existing sink connector implementations will continue to work as before. Developers can optionally modify sink connector implementations to use the new error reporting feature, yet still easily support installing and running those connectors in older Connect runtimes where the feature does not exist.

Moreover, to ensure that new connectors using this new method and interface can still be deployed on older versions of Kafka Connect, the developer should use a try catch block to catch the NoSuchMethodError or NoClassDefFoundError thrown by worker with an older version of AK.A major driving force and main advantage behind this design is its backwards compatibility. The idea is for the addition of the error reporter to be as seamless as possible running on older versions of Kafka Connect. In the case that a user wants to deploy a connector on an older version of Kafka Connect, errantRecordReporter(...) will act as a NOP; thus, the connector's error reporter variable will remain null and the task can proceed accordingly. Additionally, to ensure that users can deploy connectors on any version of Kafka Connect, a Java standard library interface, BiConsumer, was used to implement the error reporter rather than creating a new interface specific for this functionality. 

Rejected Alternatives

  1. New library or interface: creating a separate library like connect-reporter will cause redundancy in configuration, and creating any new interface or API will limit backwards compatibility. Deploying a connector on an old version of connect will not be a seamless integration with the introduction of a new interface
  2. Exposing ErrorReporter and ProcessingContext as public APIs: the main issue with this design is that the the API exposes packages with runtime in them and package names cannot be changed.
  3. Labeling as a dead letter queueAsynchronous error reporting: this introduces multiple considerations along each step of the error reporting, which complicates the design and implementation; the increase in throughput doesn't outweigh the added complicationis too specific of a label; using error reporter creates a more general contract of use case
  4. Batch error reporting: this would introduce the need for keeping track of order, and the increase in throughput doesn't seem to outweigh the added complication
  5. Creating a setter method in SinkTask that accepts an error reporter object: while this allows for backwards compatibility, it doesn't follow previous patterns of adding these types of methods to SinkTaskContext and is not a very aesthetic addition to the interface
  6. Creating an overloaded put(...) method that accepts an error reporter object and deprecating the original put(...) method: deprecating the original method can cause confusion on which method to implement and if the wrong method is implemented, it can cause backwards compatibility issues
  7. Synchronous-only functionality: this limits developer control on functionality and traditionally a lot of functionality within Kafka Connect has had asynchronous functionality
  8. Using Callback rather than Future: because a sink task's callback is called from the producer thread, it can risk a poorly written sink task callback killing the reporter's producer without necessarily failing the task