Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under Discussion"Accepted

Discussion thread: here

JIRA: here

Released: 2.0.0

The proposal discussed in this KIP is implemented in this pull request.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Several new behaviors for handling and reporting errors are introduced, and all must be configured in the individual connector configurations.

Retry on Failure

Retry Connect will attempt to retry the failed operation for a configurable number of times, total duration, starting with a fixed duration (value of 300ms) and with exponential backoff between each retry, based on a fixed starting delay value. The number of retries and backoff can be configured using the following new properties:

Configuration NameDescriptionDefault ValueDomain
errors.retry.
limitThe maximum number of retries before failing
timeoutThe total duration a failed operation will be retried for.0[-1, 0, 1, ... Long.MAX_VALUE], where -1 means infinite
retries
duration.
errors.retry.delay.max.msThe maximum
duration
delay between two consecutive retries (in milliseconds). Jitter will be added to the delay once this limit is reached to prevent any thundering herd issues.60000[1, ... Long.MAX_VALUE]

Task Tolerance Limits

Tolerate up to a configurable number of errors in a task. A failed operation is declared to be an error only if Connect has exhausted all retry options. If the task fails to successfully perform an operation on a record within tolerance limit, the record is skipped. Once the tolerance limit (overall or rate) is reached, the task will fail. Tolerance limits can be configured using the following new properties:

Config OptionDescriptionDefault ValueDomain
errors.tolerance.limitFail the task if we exceed specified number of errors overall.0[-1, 0, 1, ... Long.MAX_VALUE], where a value of -1 means infinite failures will be tolerated.
errors.tolerance
.rate.limit

Fail the task if we exceed specified number of errors in the observed duration.
0
none[
-1, 0, 1, ... Long.MAX_VALUE], where a value of -1 means infinite failures will be tolerated in the observed window.errors.tolerance.rate.durationThe duration of the window for which we will monitor errors.minuteminute, hour, day
none, all].

Log Error Context

The error context and processing information can be logged along with the standard application logs using the following configuration properties:

Config OptionDescriptionDefault ValueDomain
errors.log.enableLog the error context along with the other application logs. This context includes details about the failed operation, and the record which caused the failure.falseBoolean
errors.log.include.messagesWhether to include the Connect
Record which failed to process in the
Record in every log. This is useful if users do not want records to be written to log files because they contain sensitive information, or are simply very large. If this property is disabled, Connect will still log some minimal information about the record (for example, the source partition and offset if it is a SourceRecord, and Kafka topic and offset if it is a SinkRecord).falseBoolean

Dead Letter Queue (for Sink Connectors only)

For sink connectors, we will write the original record (from the Kafka topic the sink connector is consuming from) which caused the failure to that failed in the converter or transformation step into a configurable Kafka topic. 

Config OptionDescriptionDefault ValueDomain
errors.deadletterqueue.topic.nameThe name of the dead letter queue topic. If not set, this feature will be disabled.""A valid Kafka topic name
errors.deadletterqueue.topic.replication.factorReplication factor used to create the dead letter queue topic when it doesn't already exist.3[1 ... Short.MAX_VALUE]
errors.deadletterqueue.context.headers.enableIf true, multiple headers will be added to annotate the record with the error contextfalseBoolean

If the property errors.deadletterqueue.context.headers.enable is set to true, the following headers will be added to the produced raw message (only if they don't already exist in the message). All values will be serialized as UTF-8 strings.

Header NameDescription
__connect.errors.topicName of the topic that contained the message.
__connect.errors.partitionThe numeric ID of the partition in the original topic that contained the message (encoded as a UTF-8 string).
__connect.errors.offsetThe numeric value of the message offset in the original topic (encoded as a UTF-8 string).
__connect.errors.connector.nameThe name of the connector which encountered the error.
__connect.errors.task.idThe numeric ID of the task that encountered the error (encoded as a UTF-8 string).
__connect.errors.stageThe name of the stage where the error occurred.
__connect.errors.class.nameThe fully qualified name of the class that caused the error.
__connect.errors.exception.class.nameThe fully qualified classname of the exception that was thrown during the execution.
__connect.errors.exception.messageThe message in the exception.
__connect.errors.exception.stacktraceThe stacktrace of the exception.

Metrics

The following new metrics will monitor the number of failures, and the behavior of the response handler. Specifically, the following set of counters:

...

MBean namekafka.connect:type=task-error-metrics,connector=([-.\w]+),task=([-.\w]+)

Metric/Attribute Name

Description

Implemented
total-record-failuresTotal number of failures seen by this task.2.0.0
total-record-errorsTotal number of errors seen by this task.2.0.0
total-records-skippedTotal number of records skipped by this task.2.0.0
total-retriesTotal number of retries made by this task.2.0.0
total-
failures
errors-loggedThe number of messages that was logged into either the dead letter queue or with Log4j.2.0.0
deadletterqueue-produce-requestsNumber of produce requests to the dead letter queue.2.0.0
deadletterqueue-produce-failuresNumber of records which failed to produce correctly to the dead letter queue.2.0.0
last-
failure
error-timestampThe timestamp when the last
failure
error occurred in this task.2.0.0

Proposed Changes

A connector consists of multiple stages. For source connectors, Connect retrieves the records from the connector, applies zero or more transformations, uses the converters to serialize each record’s key, value, and headers, and finally writes each record to Kafka. For sink connectors, Connect reads the topic(s), uses the converters to deserialize each record’s key, value, and headers, and for each record applies zero or more transformations and delivers the records to the sink connector. In this proposal, we will specifically deal with the following failure scenarios which can occur during these stages:

Operation
Exception Handled
Will Retry?Tolerated Exceptions
Transformation

only on org.apache.kafka.connect.errors.RetriableException

java.lang.Exception

Key, Value and Header Converter

only on org.apache.kafka.connect.errors.RetriableException

java.lang.Exception

Kafka Produce and Consume

only on org.apache.kafka.common.errors.RetriableException

Other Operations
only on org.apache.kafka.connect.errors.RetriableException, fail task otherwise.

put() in SinkTask and poll() in SourceTask

only on org.apache.kafka.connect.errors.RetriableException

only on org.apache.kafka.connect.errors.RetriableException, fail task otherwise.

There are two behavioral changes introduced by this KIP. First, a failure in any stage will be reattempted, thereby “blocking” the connector. This helps in situations where time is needed to manually update an external system, such as manually correcting a schema in the Schema Registry. More complex causes, such as requiring code changes or corruptions in the data that can’t be fixed externally, will require the worker to be stopped, data to be fixed and then the Connector to be restarted. In the case of data corruption, the topic might need to be cleaned up too. If the retry limit for a failure is reached, then the tolerance limit is used to determine if this record should be skipped, or if the task is to be killed. The second behavioral change is introduced in how we report these failures. Currently, only the exception which kills the task is written with the application logs. With the additions presented in this KIP, we are logging details about the failed operation along with the bad record. We are also introducing an option to write bad records into a dead letter queue for Sink Connectors. This would write the original key, value and headers of failed records into a configured Kafka topic. 

...

Code Block
languagejava
# disable retries on failure
errors.retriesretry.limittimeout=0

# do not log the error and their contexts
errors.log.enable=false

# do not record errors in a dead letter queue topic
errors.dlqdeadletterqueue.topic.enablename=false

# Fail on first failureerror
errors.tolerance.limit=0none

Example 2: Record and Skip

...

Code Block
languagejava
# retry for at most 10 100minutes times waiting up to 530 minutesseconds between consecutive failures
errors.retriesretry.limittimeout=100600000
errors.retriesretry.delay.max.ms=30000030000

# log error context along with application logs, but do not include configs and messages
errors.log.enable=true
errors.log.include.configs=false
errors.log.include.messages=false

# produce error context into the Kafka topic
errors.dlqdeadletterqueue.topic.name=my-connector-errors

# Tolerate all errors.
errors.tolerance.limit=-1
errors.tolerance.rate.limit=-1
errors.tolerance.rate.duration.ms=60000=all

Compatibility, Deprecation, and Migration Plan

...

Defining and configuring properties in the worker config: Firstly, it is out of place to specify error handling properties in a worker config when it will never be used by a worker (since all the failures are handled at the Connector level). Secondly, adding inheritance in configurations adds a level of complexity which can be avoided at this stage of development

Write records that fail in the put() step of a sink connector to the dead letter queue: since sink connectors can chose to batch records in a put() method, it is not clear what errors are caused by what records (they might be because of records that were immediately written to put(), or by some previous records that were processed later). Also, there might be connection issues that are not handled by the connector, and simply bubbled up as IOException (for example). Effectively, errors sent back to the framework from the put() method currently do not have sufficient context to determine the problematic records (if any). Addressing these issues would need a separate KIP