Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
			producer.send(
                    producerRecord,
                    (recordMetadata, e) -> {
                        if (e != null) {
                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
                            if (retryWithToleranceOperator.withinToleranceLimits(getErrorToleranceType().equals(ToleranceType.ALL)) {
                                                 commitTaskRecord(preTransformRecord, null);
                            }
                            else {
                                producerSendException.compareAndSet(null, e);
                            }

Proposed Changes

On kafka producer failure, WorkerSourceTask will check 


RetryWithToleranceOperator.java

Code Block
    // For source connectors that want to skip kafka producer errors.
    // They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing
    // to write to kafka.
    public ToleranceType getErrorToleranceType() {
        return errorToleranceType;
    }

Proposed Changes

An getter method will be added to RetryWithToleranceOperator that the WorkerSourceTask will check before allowing producer write errors to kafka to be skipped and acked by the source connector.

On kafka producer failure, WorkerSourceTask will check 

Code Block
retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL
Code Block
retryWithToleranceOperator.withinToleranceLimits()

to see if it should “ignore” this exception, call commitRecord(SourceRecord, RecordMetadata) to allow the connector to ack their source system, and continue processing. Otherwise it will set the producerSendException and the current behavior applies.

...