Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Interface

The interface will have one method and look like the following:

Code Block
/**
 * Component that the sink task can use as it {@link SinkTask#put(Collection<SinkRecord>)}
 * Reporter of problematic records and the corresponding problems.
 *
 * @since 2.9
 */
public interface ErrantRecordReporter {

    /**
     * Report a problematic record and the corresponding error to be written to the sink
     * connector's dead letter queue (DLQ).
     *
     * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future} for the
     * {@link RecordMetadata} that will be assigned to the record in the DLQ topic. Invoking
     * {@link java.util.concurrent.Future#get() get()} on this future will block until the
     * record has been written and then return the metadata for the record
     * or throw any exception that occurred while sending the record.
     * If you want to simulate a simple blocking call you can call the <code>get()</code> method
     * immediately.
     *
     * @param record the problematic record; may not be null
     * @param error  the error capturing the problem with the record; may not be null
     * @return a future that can be used to block until the record and error are written
     *         to the DLQ
     * @since 2.9
     */
    default Future<RecordMetadata> report(SinkRecord record, Throwable error) {
        return report(record, error, null);
    }

    /**
     * Report a problematic record and the corresponding error to be written to the sink
     * connector's dead letter queue (DLQ).
     *
     * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future} for the
     * {@link RecordMetadata} that will be assigned to the record in the DLQ topic. Invoking
     * {@link java.util.concurrent.Future#get() get()} on this future will block until the
     * record has been written and then return the metadata for the record
     * or throw any exception that occurred while sending the record.
     * If you want to simulate a simple blocking call you can call the <code>get()</code> method
     * immediately.
     *
     * <p>Fully non-blocking usage can make use of the {@link Callback} parameter to provide a
     * callback that will be invoked when the request is complete. Callbacks for records being
     * sent to the same partition are guaranteed to execute in order.
     *
     * @param record   the problematic record; may not be null
     * @param error    the error capturing the problem with the record; may not be null
     * @param callback A user-supplied callback to execute when the record has been acknowledged
     *                 by the server; may be null for no callback
     * @return a future that can be used to block until the record and error are written
     *         to the DLQ
     * @since 2.9
     */
    Future<RecordMetadata> report(SinkRecord record, Throwable error, Callback callback);
}


Proposed Changes

Reporting

The SinkTaskContext class will create a BiFunction<SinkRecord, Throwable, Future<Void>> object as the error reporter object. Along with the original errant sink record, the exception thrown due to the errant sink record will be passed to the error reporter to provide additional context. For asynchronous functionality, the error reporter will return a Future for each record processed. error reporter interface will have one asynchronous reporting method. The method will accept the errant record and the exception thrown while processing the errant record, and return a Future<RecordMetadata>

The error reporter will use the same configurations as the dead letter queue in KIP-298 to avoid redundant configuration. There will be no additional configurations for the Producer and AdminClient under the hood aside from the existing producer. and admin. configurations present in the worker configurations and producer.override. and admin.override. configurations present in the connector configurations. Serialization for the errant records will be done in the same manner as KIP-298.

Synchrony

The error reporting functionality can be either asynchronous or synchronous. By default, error reporting will be asynchronous; processing of the subsequent errant record will not be blocked by the successful processing of the prior errant record. However, if the developer prefers synchronous functionality, they can block processing of the next record with future.get(). In order to avoid error records being written out of order (for example, due to retries), the developer should always use max.in.flight.requests.per.connection=1 in their implementation for writing error records. If the developer determines that order is not important and they want extreme performance, they can always increase this number.

...

Compatibility, Deprecation, and Migration Plan

Backwards Compatibility

A major driving force and advantage behind this design is its backwards compatibility. The idea is for the addition of the error reporter to be as seamless as possible running on older versions of Kafka Connect. Thus, rather than creating a new interface for the error reporter, the Java standard library interface BiFunction was used as the error reporter object. This ensures that the interface will always be in the classpath of any version of AKThis proposal is backward compatible such that existing sink connector implementations will continue to work as before. Developers can optionally modify sink connector implementations to use the new error reporting feature, yet still easily support installing and running those connectors in older Connect runtimes where the feature does not exist.

With respect to the new method, to ensure that new connectors using this method can still be deployed on older versions of Kafka Connect, the developer can use a try catch block to catch the NoSuchMethodException thrown by worker with an older version of AK.

...