...
This feature will directly make changes to the SinkTask
SinkTaskContext
class with the addition of a new method.
Method
This KIP will add a setter method to the SinkTask
class that will accept an getter method to the SinkTaskContext
class that will return a error reporter
object. Sink connectors that wish to use an a error reporter can override call this method to initialize a error reporter in method within their tasks. The
The method and usage will look something like the following:
...
Code Block |
---|
public interface SinkTaskContext {
...
/**
* Get the reporter to which the sink task can report problematic or failed {@link SinkRecord}
* passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record,
* the sink task will receive a {@link Future} that the task can optionally use to wait until
* the failed record and exception have been written to Kafka via Connect's DLQ. Note that
* the result of this method may be null if this connector has not been configured with a DLQ.
*
* <p>This method was added in Apache Kafka 2.9. Sink tasks that use this method but want to
* maintain backward compatibility so they can also be deployed to older Connect runtimes
* should guard the call to this method with a try-catch block, since calling this method will result in a
* {@link NoSuchMethodException} when the sink connector is deployed to Connect runtimes
* older than Kafka 2.9. For example:
* <pre>
* BiFunction<SinkTask, Throwable, Future<Void>> reporter;
* try {
* reporter = context.failedRecordReporter();
* } catch (NoSuchMethodException e) {
* reporter = null;
* }
* </pre>
*
* @return the reporter function; null if no error reporter has been configured for the connector
* @since 2.9
*/
BiFunction<SinkTask, Throwable, Future<Void>> failedRecordReporter();
} |
The usage will look like the following:
Code Block |
---|
private BiFunction<SinkTask, Throwable, Future<Void> reporter; @Override public void start(Map<String, String> props) { ... try { reporter = context.failedRecordReporter(); // may be null if DLQ not enabled } catch (NoSuchMethodException e) { // Will occur in Connect runtimes earlier than 2.9 reporter = null; } } @Override public void put(Collection<SinkRecord> records) { for (SinkRecord record: records) { try { // attempt to send record to data sink process(record); } catch(Exception e) { if (reporter != null) { // Send errant record to error reporter Future<Void> future = reporter.accept(record, e); // Optionally wait till the failure's been recorded in Kafka future.get(); } else { // There's no error reporter, so fail throw new ConnectException("Failed on record", e); } } } } |
Proposed Changes
Error Reporter Object
Kafka Connect will create a BiConsumer<SinkRecord, Throwable>
object to as the error reporter object. Along with the original errant sink record, the exception thrown will be sent to the error reporter to provide additional context. The only configurations exposed to the user and developer will be the following, taken from KIP-298:
...
As mentioned above, the error reporter will use the same configurations as the dead letter queue in KIP-298 to avoid redundant configuration. There will be no additional configurations for the Producer and AdminClient
under the hood aside from the existing producer.
and admin.
configurations present in the worker configurations and producer.override.
and admin.override.
configurations present in the connector configurations. Serialization for the errant records will be done in the same manner as KIP-298.
Order of Operations
The order of operations will proceed like so:
...
Oconfigured to use an error reporter, the task can send errant records to it within put(...)
Synchrony
The error reporting functionality is intended to be synchronous. If a record is sent to the error reporter, processing of the next errant record in accept(...)
will not begin until the producer successfully sends the errant record to Kafka. If the producer encounters an exception while sending the errant record to Kafka, the task will be killed; if the error reporting is failing as well, it does not make sense to keep running the connector. While this synchronous design will cause a throughput hit if many records are sent to the error reporter, a high number of errant records should not be a typical case in connector usage. Additionally, this throughput hit can provide a chance for the user to realize there is a high rate of errant records before many records are actually sent to the error reporter.
...