Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
    /**
     * <p>
     * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
     * also called when a record is filtered by a transformation or when {@link ConnectorConfig} "errors.tolerance" is set to "all"
     * and thus will never be ACK'd by a broker.
     * In both cases {@code metadata} will be null.
     * </p>
     * <p>
     * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
     * automatically. This hook is provided for systems that also need to store offsets internally
     * in their own system.
     * </p>
     * <p>
     * The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is
     * not necessary to implement both methods.
     * </p>
     *
     * @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
     * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
     * @throws InterruptedException
     */
    public void commitRecord(SourceRecord record, RecordMetadata metadata)
            throws InterruptedException {
        // by default, just call other method for backwards compatibility
        commitRecord(record);
    }

Other Changes


WorkerSourceTask.java

Code Block
			producer.send(
                    producerRecord,
                    (recordMetadata, e) -> {
                        if (e != null) {
                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
                            if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
                                 commitTaskRecord(preTransformRecord, null);
                            }
                            else {
                                producerSendException.compareAndSet(null, e);
                            }

...