...
Code Block |
---|
producer.send( producerRecord, (recordMetadata, e) -> { if (e != null) { log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); if (retryWithToleranceOperator.withinToleranceLimits(getErrorToleranceType().equals(ToleranceType.ALL)) { commitTaskRecord(preTransformRecord, null); } else { producerSendException.compareAndSet(null, e); } |
Proposed Changes
On kafka producer failure, WorkerSourceTask will check
RetryWithToleranceOperator.java
Code Block |
---|
// For source connectors that want to skip kafka producer errors.
// They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing
// to write to kafka.
public ToleranceType getErrorToleranceType() {
return errorToleranceType;
} |
Proposed Changes
An getter method will be added to RetryWithToleranceOperator that the WorkerSourceTask will check before allowing producer write errors to kafka to be skipped and acked by the source connector.
On kafka producer failure, WorkerSourceTask will check
Code Block |
---|
retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL |
Code Block |
retryWithToleranceOperator.withinToleranceLimits() |
to see if it should “ignore” this exception, call commitRecord(SourceRecord, RecordMetadata) to allow the connector to ack their source system, and continue processing. Otherwise it will set the producerSendException and the current behavior applies.
...