...
Code Block |
---|
private ErrantRecordReporter reporter; @Override public void start(Map<String, String> props) { ... try { reporter = context.failedRecordReportererrantRecordReporter(); // may be null if DLQ not enabled } catch (NoSuchMethodException | NoClassDefFoundError e) { // Will occur in Connect runtimes earlier than 2.6 reporter = null; } } @Override public void put(Collection<SinkRecord> records) { for (SinkRecord record: records) { try { // attempt to send record to data sink process(record); } catch(Exception e) { if (reporter != null) { // Send errant record to error reporter Future<Void> future = reporter.acceptreport(record, e); // Optionally wait till the failure's been recorded in Kafka future.get(); } else { // There's no error reporter, so fail throw new ConnectException("Failed on record", e); } } } } |
...
No new metrics will be added for the error reporter. Instead, the metrics detailed in KIP-298 for the dead letter queue will be used for the error reporter.
Errors Tolerance
The Errant Record Reporter will adhere to the existing DLQ error tolerance functionality. For example, if `errors.tolerance` is set to `ALL`, all errors will be tolerated; if the property is set to `NONE`, then the Errant Record Reporter will throw an exception detailing that the tolerance has been exceeded. Even if the developer chooses to catch and swallow any of these exceptions thrown during `report(...)`, the task is guaranteed to be killed following the completion of `put(...)`.
FileSinkTask Example
This KIP will update the FileSinkTask
example to use the new API, to demonstrate proper error handling with the ability to use it on older versions of Kafka Connect.
...