Status
Current state: Under Discussion
Discussion thread: Here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, KIP-298 provides error handling in Kafka Connect that includes functionality such as retrying, logging, and sending errant records to a dead letter queue. However, the dead letter queue functionality from KIP-298 only supports error reporting within contexts of the transform operation, and key, value, and header converter operation. After records are sent to the connector for processing, there is no support for dead letter queue/error reporting functionality.
As stated in rejected alternatives, ”Write records that fail in the put() step of a sink connector to the dead letter queue: since sink connectors can chose to batch records in a put() method, it is not clear what errors are caused by what records (they might be because of records that were immediately written to put(), or by some previous records that were processed later). Also, there might be connection issues that are not handled by the connector, and simply bubbled up as IOException (for example). Effectively, errors sent back to the framework from the put() method currently do not have sufficient context to determine the problematic records (if any). Addressing these issues would need a separate KIP. “
Thus, this proposal aims to extend KIP-298 and add error reporting functionality even after records are sent to connector tasks without adding redundancy in configuration.
Public Interfaces
This feature will directly make changes to the SinkTaskContext
class with the addition of a new method.
Method
This KIP will add a getter method to the SinkTaskContext
class that will return a error reporter
object. Sink connectors that wish to use a error reporter can call this method within their tasks.
The method will look the following:
public interface SinkTaskContext { ... /** * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord} * passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record, * the sink task will receive a {@link Future} that the task can optionally use to wait until * the failed record and exception have been written to Kafka via Connect's DLQ. Note that * the result of this method may be null if this connector has not been configured with a DLQ. * * <p>This method was added in Apache Kafka 2.9. Sink tasks that use this method but want to * maintain backward compatibility so they can also be deployed to older Connect runtimes * should guard the call to this method with a try-catch block, since calling this method will result in a * {@link NoSuchMethodException} when the sink connector is deployed to Connect runtimes * older than Kafka 2.9. For example: * <pre> * BiFunction<SinkTask, Throwable, Future<Void>> reporter; * try { * reporter = context.failedRecordReporter(); * } catch (NoSuchMethodException e) { * reporter = null; * } * </pre> * * @return the reporter function; null if no error reporter has been configured for the connector * @since 2.9 */ BiFunction<SinkTask, Throwable, Future<Void>> failedRecordReporter(); }
The usage will look like the following:
private BiFunction<SinkTask, Throwable, Future<Void> reporter; @Override public void start(Map<String, String> props) { ... try { reporter = context.failedRecordReporter(); // may be null if DLQ not enabled } catch (NoSuchMethodException e) { // Will occur in Connect runtimes earlier than 2.9 reporter = null; } } @Override public void put(Collection<SinkRecord> records) { for (SinkRecord record: records) { try { // attempt to send record to data sink process(record); } catch(Exception e) { if (reporter != null) { // Send errant record to error reporter Future<Void> future = reporter.accept(record, e); // Optionally wait till the failure's been recorded in Kafka future.get(); } else { // There's no error reporter, so fail throw new ConnectException("Failed on record", e); } } } }
Proposed Changes
Error Reporter Object
The SinkTaskContext
class will create a BiFunction<SinkRecord, Throwable, Future<Void>>
object as the error reporter object. Along with the original errant sink record, the exception thrown will be sent to the error reporter to provide additional context. For asynchronous functionality, the error reporter will return a Future
for each record processed. The error reporter will use the same configurations as the dead letter queue in KIP-298 to avoid redundant configuration. There will be no additional configurations for the Producer and AdminClient
under the hood aside from the existing producer.
and admin.
configurations present in the worker configurations and producer.override.
and admin.override.
configurations present in the connector configurations. Serialization for the errant records will be done in the same manner as KIP-298.
Synchrony
The error reporting functionality can be either asynchronous or synchronous. By default, error reporting will be asynchronous; processing of the subsequent errant record will not be blocked by the successful processing of the prior errant record. However, if the developer prefers synchronous functionality, they can block processing of the next record with future.get(). In order to avoid error records being written out of order (for example, due to retries), the developer can use max.in.flight.requests.per.connection=1
in their implementation for writing error records.
Metrics
No new metrics will be added for the error reporter. Instead, the metrics detailed in KIP-298 for the dead letter queue will be used for the error reporter.
Compatibility, Deprecation, and Migration Plan
Backwards Compatibility
A major driving force and advantage behind this design is its backwards compatibility. The idea is for the addition of the error reporter to be as seamless as possible running on older versions of Kafka Connect. Thus, rather than creating a new interface for the error reporter, the Java standard library interface BiFunction
was used as the error reporter object. This ensures that the interface will always be in the classpath of any version of AK.
With respect to the new method, to ensure that new connectors using this method can still be deployed on older versions of Kafka Connect, the developer can use a try catch block to catch the NoSuchMethodException
thrown by worker with an older version of AK.
Rejected Alternatives
- New library or interface: creating a separate library like
connect-reporter
will cause redundancy in configuration, and creating any new interface or API will limit backwards compatibility. Deploying a connector on an old version of connect will not be a seamless integration with the introduction of a new interface - Exposing
ErrorReporter
andProcessingContext
as public APIs: the main issue with this design is that the the API exposes packages withruntime
in them and package names cannot be changed. - Labeling as a dead letter queue: this is too specific of a label; using error reporter creates a more general contract of use case
- Batch error reporting: this would introduce the need for keeping track of order, and the increase in throughput doesn't seem to outweigh the added complication
- Creating a setter method in
SinkTask
that accepts an error reporter object: while this allows for backwards compatibility, it doesn't follow previous patterns of adding these types of methods toSinkTaskContext
and is not a very aesthetic addition to the interface - Creating an overloaded
put(...)
method that accepts an error reporter object and deprecating the originalput(...)
method: deprecating the original method can cause confusion on which method to implement and if the wrong method is implemented, it can cause backwards compatibility issues - Synchronous-only functionality: this limits developer control on functionality and traditionally a lot of functionality within Kafka Connect has had asynchronous functionality
- Using
Callback
rather thanFuture
: because a sink task's callback is called from the producer thread, it can risk a poorly written sink task callback killing the reporter's producer without necessarily failing the task