You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Under Discussion

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We have Source Connectors bridging in data from systems that we do not have explicit control over. Even with some defensive settings, it is possible we may receive a message that is too large/un-processable for our configured Connect Worker, Kafka Broker, and other eco system components. The current behavior kills the Connector when such an error arises.

KAFKA-8586 addressed the issue of silently failing to produce data by capturing the producer errors and failing the task.

We have no ability to handle these errors inside of the Connector.

We would like to stay alive and keep processing and log out the error/write metadata to a topic/etc... for other systems to pick up for human intervention.

There is extensive error handling/retry capability (KIP-298, KIP-458) but none of it applies to the step of the process if the actual write to Kafka fails.

Public Interfaces

SourceTask.java:

    /**
     * <p>
     * This function gives SourceTasks an option to handle Producer Exceptions rather than dying.
     *     Non-Retriable exceptions (fatal, the message will never be sent):
     *
     *      InvalidTopicException
     *      OffsetMetadataTooLargeException
     *      RecordBatchTooLargeException
     *      RecordTooLargeException
     *      UnknownServerException
     *      UnknownProducerIdException
     *      InvalidProducerEpochException
     * </p>
     * <p>
     * SourceTasks are not required to implement this functionality; Kafka Connect will still kill the connector
     * by default if this function is not overridden by subclasses.
     * </p>
     *
     * @param sourceRecord {@link SourceRecord} Record that was not writable to Kafka
     * @param e {@link Exception} exception that was thrown when the producer attempted to write the SourceRecord to Kafka
     * @return boolean whether or not Connect should ignore this exception
     */
    public boolean ignoreNonRetriableProducerException(SourceRecord sourceRecord, Exception e) {
        return false;
    }

WorkerSourceTask.java:

	...
		producer.send(
                    producerRecord,
                    (recordMetadata, e) -> {
                        if (e != null) {
                            log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
                            log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
                            if (!task.ignoreNonRetriableProducerException(record, e)) {
                                producerSendException.compareAndSet(null, e);
                            }
                        } else {
	...

Proposed Changes

I would like to add a method to the SourceTask abstract class that allows Source Connectors the option, but not the obligation, to take action on non-retriable Producer exceptions.

WorkerSourceTask will check this method to see if it should “ignore” this exception and continue processing, otherwise it will set the producerSendException and the current behavior applies.

Compatibility, Deprecation, and Migration Plan

This function by default would return false. Any Source Connector that does not override it will continue with the current behavior of dying on a Producer Exception.

Rejected Alternatives

The connector could get access to the worker properties and do some validation:

The message could then be intercepted before a write is attempted to Kafka. This is a lot of a priori knowledge to configure in advance and does not allow any leeway if an edge case is missed.

Keep current behavior:

There are cases where we want to note somewhere in Kafka that we failed to ingest a record. Writing a DLQ back to the source system may not be advisable or possible. In the particular case of a record that is too large, we have no means to sample some metadata from the SourceRecord or even throw it away with some log messages.

  • No labels