THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
There is extensive error handling/retry capability (KIP-298, KIP-458) but none of it applies to the step of the process if the actual write to Kafka fails.
Public Interfaces
...
WorkerConfig.java
Code Block |
---|
public static final String IGNORE_PRODUCER_EXCEPTIONS_CONFIG = "errors.producer.ignore";
public static final boolean IGNORE_PRODUCER_EXCEPTIONS_DEFAULT = false;
private static final String IGNORE_PRODUCER_EXCEPTIONS_DOC = "If true, the connector will ignore producer exceptions. "
+ "The SourceRecord that failed to be written will be handed to commitRecord for handling by the connector.";
...
.define(IGNORE_PRODUCER_EXCEPTIONS_CONFIG, Type.BOOLEAN, IGNORE_PRODUCER_EXCEPTIONS_DEFAULT,
Importance.MEDIUM, IGNORE_PRODUCER_EXCEPTIONS_DOC)
... |
SourceTask.java
Code Block |
---|
/** * <p> * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker. It is also * called when {@link WorkerConfig} "errors.producer.ignore" is set to true. * In both cases {@code metadata} will be null. * </p> * <p> * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets * automatically. This hook is provided for systems that also need to store offsets internally * in their own system. * </p> * <p> * The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is * not necessary to implement both methods. * </p> * * @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored * @throws InterruptedException */ public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException { // by default, just call other method for backwards compatibility commitRecord(record); } |
...
Code Block |
---|
producer.send( producerRecord, (recordMetadata, e) -> { if (e != null) { log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); if (workerConfig.getBoolean(SourceConnectorConfigWorkerConfig.IGNORE_PRODUCER_EXCEPTIONS_CONFIG)) { commitTaskRecord(preTransformRecord, null); } else { producerSendException.compareAndSet(null, e); } |
...
I would like to add a configuration item to SourceConnectorConfigWorkerConfig: "errors.producer.ignore".
...