Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread: here 

JIRA: here 

...

Code Block
    /**
     * <p>
     * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
     * also called when a record is filtered by a transformation or when {@link ConnectorConfig} "errors.tolerance" is set to "all"
     * and thus will never be ACK'd by a broker.
     * In both cases {@code metadata} will be null.
     * </p>
     * <p>
     * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
     * automatically. This hook is provided for systems that also need to store offsets internally
     * in their own system.
     * </p>
     * <p>
     * The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is
     * not necessary to implement both methods.
     * </p>
     *
     * @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
     * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
     * @throws InterruptedException
     */
    public void commitRecord(SourceRecord record, RecordMetadata metadata)
            throws InterruptedException {
        // by default, just call other method for backwards compatibility
        commitRecord(record);
    }

Other Changes

The behavior of errors.tolerance will change with this update! Current connectors should expect to be able to recover from producer exceptions if they are set to all. See compatibility and migration section below.

WorkerSourceTask.java

Code Block
			producer.send(
                    producerRecord,
                    (recordMetadata, e) -> {
                        if (e != null) {
                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
                            if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
                                 commitTaskRecord(preTransformRecord, null);
                            }
                            else {
                                producerSendException.compareAndSet(null, e);
                            }

...

Existing source connectors that do not set errors.tolerance to all will be unaffected. Default behavior is to still kill the task in the event of a producer write failure.

(Hypothetical use case): Existing source connectors that enable errors.tolerance all and expect to die on producer failure will need to update their connector to handle commitRecord with a null Metadata record. Not sure how much of these are in the wild, because it would require human intervention on the source system to fix whatever problem was causing the producer to die on kafka write in the first place.

Rejected Alternatives

Adding a new connect worker configuration item:

...