THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
public interface SinkTaskContext { ... /** * Get the reporter to which the sink task can report problematic or failed {@link SinkRecord} * passed to the {@link SinkTask#put(Collection)} method. When reporting a failed record, * the sink task will receive a {@link Future} that the task can optionally use to wait until * the failed record and exception have been written to Kafka via Connect's DLQ. Note that * the result of this method may be null if this connector has not been configured with a DLQ. * * <p>This method was added in Apache Kafka 2.6. Sink tasks that use this method but want to * maintain backward compatibility so they can also be deployed to older Connect runtimes * should guard the call to this method with a try-catch block, since calling this method will result in a * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the sink connector is deployed to Connect runtimes * Connect runtimes older than Kafka 2.6. For example: * <pre> * BiFunction<SinkTask, Throwable, Future<Void>> ErrantRecordReporter reporter; * try { * reporter = context.failedRecordReporter(); * } catch (NoSuchMethodError | NoClassDefFoundError e) { * reporter = null; * } * </pre> * * @return the reporter function; null if no error reporter has been configured for the connector * @since 2.6 */ BiFunction<SinkTask, Throwable, Future<Void>> ErrantRecordReporter failedRecordReporter(); } |
...
The usage will look like the following:
Code Block |
---|
private BiFunction<SinkTask, Throwable, Future<Void> ErrantRecordReporter reporter; @Override public void start(Map<String, String> props) { ... try { reporter = context.failedRecordReporter(); // may be null if DLQ not enabled } catch (NoSuchMethodException | NoClassDefFoundError e) { // Will occur in Connect runtimes earlier than 2.6 reporter = null; } } @Override public void put(Collection<SinkRecord> records) { for (SinkRecord record: records) { try { // attempt to send record to data sink process(record); } catch(Exception e) { if (reporter != null) { // Send errant record to error reporter Future<Void> future = reporter.accept(record, e); // Optionally wait till the failure's been recorded in Kafka future.get(); } else { // There's no error reporter, so fail throw new ConnectException("Failed on record", e); } } } } |
...
Code Block |
---|
/** * Component that the sink task can use as it {@link SinkTask#put(Collection<SinkRecord>)} * Reporter of problematic records and the corresponding problems. * * @since 2.6 */ public interface ErrantRecordReporter { /** * Report a problematic record and the corresponding error to be written to the sink * connector's dead letter queue (DLQ). * * <p>This call is asynchronous and returns a {@link java.util.concurrent.Future Future} for the * {@link RecordMetadata} that will be assigned to the record in the DLQ topic. Invoking * {@link java.util.concurrent.Future#get() get()} on this future will block until the * record has been written and then return the metadata for the record * or throw any exception that occurred while sending the record. * If you want to simulate a simple blocking call you can call the <code>get()</code> method * immediately. * * @param record the problematic record; may not be null * @param error the error capturing the problem with the record; may not be null * @return a future that can be used to block until the record and error are written * to the DLQ * @since 2.6 */ default Future<RecordMetadata> report(SinkRecord record, Throwable error) { return report(record, error, null); } } |
Proposed Changes
Reporting
...