Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The error reporting functionality can be either asynchronous or synchronous. By default, error reporting will be asynchronous; processing of the subsequent errant record will not be blocked by the successful processing of the prior errant record. However, if the developer prefers synchronous functionality, they can block processing of the next record with future.get(). In order to avoid error records being written out of order (for example, due to retries), the developer can use max.in.flight.requests.per.connection=1 in their implementation for writing error records.

Additionally, If the producer encounters an exception while sending the errant record to Kafka, the task will be killed; if the error reporting is failing as well, it does not make sense to keep running the connector. 

Metrics

The following metrics will used to monitor the error reporter:

...

Metrics

No new metrics will be added for the error reporter. Instead, the metrics detailed in KIP-298 for the dead letter queue will be used for the error reporter.

...

Compatibility, Deprecation, and Migration Plan

...

A major driving force and main advantage behind this design is its backwards compatibility. The idea is for the addition of the error reporter to be as seamless as possible running on older versions of Kafka Connect. In the case that a user wants to deploy a connector on an older version of Kafka Connect, errantRecordReporter(...) will act as a NOP; thus, the connector's error reporter variable will remain null and the task can proceed accordingly. Additionally, to ensure that users can deploy connectors on any version of Kafka Connect, a Java standard library interface, BiConsumer, was used to implement the error reporter rather than creating a new interface specific for this functionality. Thus, rather than creating a new interface for the error reporter, the Java standard library interface BiFunction was used as the error reporter object. This ensures that the interface will always be in the classpath of any version of AK.

With respect to the new method, to ensure that new connectors using this method can still be deployed on older versions of Kafka Connect, the developer can use a try catch block to catch the NoSuchMethodException thrown by worker with an older version of AK.

Rejected Alternatives

  1. New library or interface: creating a separate library like connect-reporter will cause redundancy in configuration, and creating any new interface or API will limit backwards compatibility. Deploying a connector on an old version of connect will not be a seamless integration with the introduction of a new interface
  2. Exposing ErrorReporter and ProcessingContext as public APIs: the main issue with this design is that the the API exposes packages with runtime in them and package names cannot be changed.
  3. Labeling as a dead letter queueAsynchronous error reporting: this introduces multiple considerations along each step of the error reporting, which complicates the design and implementation; the increase in throughput doesn't outweigh the added complication
  4. Batch error reporting: this would introduce the need for keeping track of order, and the increase in throughput doesn't seem to outweigh the added complication
  5. Creating a setter method in SinkTask that accepts an error reporter object:
  6. Creating an overloaded put(...) method that accepts an error reporter object:
  7. Synchronous-only functionality: