Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There is extensive error handling/retry capability (KIP-298, KIP-458) but none of it applies to the step of the process if the actual write to Kafka fails.

Public Interfaces

...

WorkerConfig.java


Code Block
    public static final String IGNORE_PRODUCER_EXCEPTIONS_CONFIG = "errors.producer.ignore";
    public static final boolean IGNORE_PRODUCER_EXCEPTIONS_DEFAULT = false;
    private static final String IGNORE_PRODUCER_EXCEPTIONS_DOC = "If true, the connector will ignore producer exceptions. "
            + "The SourceRecord that failed to be written will be handed to commitRecord for handling by the connector.";

...
                .define(IGNORE_PRODUCER_EXCEPTIONS_CONFIG, Type.BOOLEAN, IGNORE_PRODUCER_EXCEPTIONS_DEFAULT,
                        Importance.MEDIUM, IGNORE_PRODUCER_EXCEPTIONS_DOC)
...


SourceTask.java

Code Block
    /**      
     * <p>
     * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
     * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker. It is also
     * called when {@link WorkerConfig} "errors.producer.ignore" is set to true.
     * In both cases {@code metadata} will be null.      
     * </p>
     * <p>
     * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
     * automatically. This hook is provided for systems that also need to store offsets internally
     * in their own system.
     * </p>
     * <p>
     * The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is
     * not necessary to implement both methods.
     * </p>
     *
     * @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
     * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
     * @throws InterruptedException
     */
    public void commitRecord(SourceRecord record, RecordMetadata metadata)
            throws InterruptedException {
        // by default, just call other method for backwards compatibility
        commitRecord(record);
    }

...

Code Block
				producer.send(
                    producerRecord,
                    (recordMetadata, e) -> {
                        if (e != null) {
                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
                            if (workerConfig.getBoolean(SourceConnectorConfigWorkerConfig.IGNORE_PRODUCER_EXCEPTIONS_CONFIG)) {
                                commitTaskRecord(preTransformRecord, null);
                            }
                            else {
                                producerSendException.compareAndSet(null, e);
                            }

...

I would like to add a configuration item to SourceConnectorConfigWorkerConfig: "errors.producer.ignore". 

...