Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread: here 

JIRA: here 

...

There is extensive error handling/retry capability (KIP-298, KIP-458) but none of it applies to the step of the process if the actual write to Kafka fails.

Public Interfaces

...

SourceTask.java

Code Block
    /**
  public static final String IGNORE_PRODUCER_EXCEPTIONS_CONFIG = "errors.producer.ignore";
* <p>
     * publicCommit statican finalindividual boolean IGNORE_PRODUCER_EXCEPTIONS_DEFAULT = false;
    private static final String IGNORE_PRODUCER_EXCEPTIONS_DOC = "If true, the connector will ignore producer exceptions. "
            + "The SourceRecord that failed to be written will be handed to commitRecord for handling by the connector.";

SourceTask.java

Code Block
    /**{@link SourceRecord} when the callback from the producer client is received. This method is
     * also called when a record is filtered by a transformation or when {@link ConnectorConfig} "errors.tolerance" is set to "all"
     * and thus will never be ACK'd by a broker.
     * <p>
In both cases {@code metadata} *will Commitbe annull.
 individual {@link SourceRecord} when the callback from the producer client is received. This method is* </p>
     * <p>
     * alsoSourceTasks calledare whennot arequired recordto isimplement filteredthis byfunctionality; aKafka transformation,Connect andwill thusrecord willoffsets
 never be ACK'd by a* brokerautomatically. This Ithook is also
provided for systems that also *need calledto when "errors.producer.ignore" is set to true. In both cases {@code metadata} will be nullstore offsets internally
     * in their own system.
     * </p>
     * <p>
     * SourceTasks are not required to implement this functionality; Kafka Connect will record offsetsThe default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is
     * automatically.not Thisnecessary hookto isimplement provided for systems that also need to store offsets internally
     * in their own systemboth methods.
     * </p>
     * <p>
     * The@param default implementation just calls record {@link #commitRecord(SourceRecord)}, whichthat iswas asuccessfully nopsent byvia default.the Itproducer, is
filtered by a transformation, or *dropped noton necessary to implement both methods.producer exception
     * </p>
@param metadata {@link RecordMetadata} record *
metadata returned from the broker, *or @paramnull recordif {@linkthe SourceRecord}record that was successfullyfiltered sentor viaif theproducer producer,exceptions filteredare byignored
 a transformation, or dropped on* producer@throws exceptionInterruptedException
     */
   @param metadatapublic {@link RecordMetadata}void commitRecord(SourceRecord record, RecordMetadata metadata)
 returned from the   broker, or null if the record wasthrows filteredInterruptedException or{
 if producer exceptions are ignored
   // by *default, @throwsjust InterruptedException
call other method for backwards */compatibility
    public   void commitRecord(SourceRecord record, RecordMetadata metadata);
      }

Other Changes

The behavior of errors.tolerance will change with this update! Current connectors should expect to be able to recover from producer exceptions if they are set to all. See compatibility and migration section below.

WorkerSourceTask.java

Code Block
			producer.send(
      throws  InterruptedException {
         // by defaultproducerRecord,
  just call other method for backwards compatibility
        commitRecord(record);
    }

WorkerSourceTask.java

Code Block
				producer.send(
   (recordMetadata, e) -> {
                 producerRecord,
       if (e !=           (recordMetadata, e) -> {
                        if (e != null) null) {
                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
                            if (workerConfig.getBoolean(SourceConnectorConfig.IGNORE_PRODUCER_EXCEPTIONS_CONFIGretryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
                                                 commitTaskRecord(preTransformRecord, null);
                            }
                            else {
                                producerSendException.compareAndSet(null, e);
                            }

Proposed Changes

I would like to add a configuration item to SourceConnectorConfig: "errors.producer.ignore". 

WorkerSourceTask will check this configuration item to see if it should “ignore” this exception, call commitRecord(SourceRecord, RecordMetadata) to allow the connector to ack their source system, and continue processing. Otherwise it will set the producerSendException and the current behavior applies.

Compatibility, Deprecation, and Migration Plan

The configuration item would default to false. Any Source Connector that does not set it to true it will continue with the current behavior of dying on a Producer Exception.

...


RetryWithToleranceOperator.java

Code Block
    // For source connectors that want to skip kafka producer errors.
    // They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing
    // to write to kafka.
    public synchronized ToleranceType getErrorToleranceType() {
        return errorToleranceType;
    }

Proposed Changes

A getter method will be added to RetryWithToleranceOperator that the WorkerSourceTask will check before allowing producer write errors to kafka to be skipped and acked by the source connector.

On kafka producer failure, WorkerSourceTask will check 

Code Block
retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)

to see if it should “ignore” this exception, call commitRecord(SourceRecord, RecordMetadata) to allow the connector to ack their source system, and continue processing. Otherwise it will set the producerSendException and the current behavior applies.

Setting "errors.tolerance" to "all" enables the functionality to skip records on producer write failure.


If exactly once semantics are enabled, the task will still be failed unconditionally.

Compatibility, Deprecation, and Migration Plan

Existing source connectors that do not set errors.tolerance to all will be unaffected. Default behavior is to still kill the task in the event of a producer write failure.

(Hypothetical use case): Existing source connectors that enable errors.tolerance all and expect to die on producer failure will need to update their connector to handle commitRecord with a null Metadata record. Not sure how much of these are in the wild, because it would require human intervention on the source system to fix whatever problem was causing the producer to die on kafka write in the first place.

Rejected Alternatives

Adding a new connect worker configuration item:

Existing configuration could be used and setting this at the connect worker level takes the choice out of the hands of the connector developers on how to handle kafka write errors.

WorkerConfig.java

Code Block

    public static final String IGNORE_PRODUCER_EXCEPTIONS_CONFIG = "errors.producer.ignore";
    public static final boolean IGNORE_PRODUCER_EXCEPTIONS_DEFAULT = false;
    private static final String IGNORE_PRODUCER_EXCEPTIONS_DOC = "If true, the connector will ignore producer exceptions. "
            + "The SourceRecord that failed to be written will be handed to commitRecord for handling by the connector.";

...
                .define(IGNORE_PRODUCER_EXCEPTIONS_CONFIG, Type.BOOLEAN, IGNORE_PRODUCER_EXCEPTIONS_DEFAULT,
                        Importance.MEDIUM, IGNORE_PRODUCER_EXCEPTIONS_DOC)
...


SourceTask.java

Code Block
    /**      
     * <p>
     * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
     * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker. It is also
     * called when {@link WorkerConfig} "errors.producer.ignore" is set to true.
     * In both cases {@code metadata} will be null.      
     * </p>
     * <p>
     * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
     * automatically. This hook is provided for systems that also need to store offsets internally
     * in their own system.
     * </p>
     * <p>
     * The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is
     * not necessary to implement both methods.
     * </p>
     *
     * @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
     * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
     * @throws InterruptedException
     */
    public void commitRecord(SourceRecord record, RecordMetadata metadata)
            throws InterruptedException {
        // by default, just call other method for backwards compatibility
        commitRecord(record);
    }


WorkerSourceTask.java

Code Block
				producer.send(
                    producerRecord,
                    (recordMetadata, e) -> {
                        if (e != null) {
                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
                            if (workerConfig.getBoolean(WorkerConfig.IGNORE_PRODUCER_EXCEPTIONS_CONFIG)) {
                                commitTaskRecord(preTransformRecord, null);
                            }
                            else {
                                producerSendException.compareAndSet(null, e);
                            }


New interface method to SourceTask:

...