You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 37 Next »

Status

Current state"Under Discussion"

Discussion thread: here

JIRA: here

The proposal discussed in this KIP is implemented in this pull request.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

There are several places in Connect during which failures may occur. Any failure to deserialize, convert, process, or read/write a record in Kafka Connect can cause a task to fail. Although some errors can be addressed with transformations or custom converters that check for malformed data, in general it is difficult to ensure correct and valid data or to tell Connect to skip problematic records.

Connect should allow users to configure how failures should be handled during all phases of processing records. Some failures such as the lack of availability of some external components may be resolved by simply retrying, while other errors should be recorded and the problem records simply skipped. Where possible, Connect should be able to record the error and optionally include the problematic records and configuration states of the connector, transform and converter. Since no single solution works for everyone, all of this error handling behavior should be configurable. 

This proposal aims to change the Connect framework to allow it to automatically deal with errors while processing records in a Connector. By default, Connect will fail immediately when an error occurs, which is the previous Connect behavior. Therefore, all new behaviors must be explicitly enabled.

Public Interfaces

Several new behaviors for handling and reporting errors are introduced, and all must be configured in the individual connector configurations.

Retry on Failure

Retry the failed operation a configurable number of times, with exponential backoff between each retry, based on a fixed starting delay value. The number of retries and backoff can be configured using the following new properties:

Configuration NameDescriptionDefault ValueDomain
errors.retry.limitThe maximum number of retries before failing.0[-1, 0, 1, ... Long.MAX_VALUE], where -1 means infinite retries.
errors.retry.delay.max.msThe maximum duration between two consecutive retries (in milliseconds).60000[1, ... Long.MAX_VALUE]

Task Tolerance Limits

Tolerate up to a configurable number of errors in a task. A failed operation is declared to be an error only if Connect has exhausted all retry options. If the task fails to successfully perform an operation on a record within tolerance limit, the record is skipped. Once the tolerance limit (overall or rate) is reached, the task will fail. Tolerance limits can be configured using the following new properties:

Config OptionDescriptionDefault ValueDomain
errors.tolerance.limitFail the task if we exceed specified number of errors overall.0[-1, 0, 1, ... Long.MAX_VALUE], where a value of -1 means infinite failures will be tolerated.
errors.tolerance.rate.limitFail the task if we exceed specified number of errors in the observed duration.0[-1, 0, 1, ... Long.MAX_VALUE], where a value of -1 means infinite failures will be tolerated in the observed window.
errors.tolerance.rate.durationThe duration of the window for which we will monitor errors.minuteminute, hour, day

Log Error Context

The error context and processing information can be logged along with the standard application logs using the following configuration properties:

Config OptionDescriptionDefault ValueDomain
errors.log.enableLog the error context (the base information, configs of various stages and the failed message) along with the other application logs.falseBoolean
errors.log.include.configsInclude the (worker, connector) configs in the log.falseBoolean
errors.log.include.messagesInclude the Connect Record which failed to process in the log.falseBoolean

Produce Error Context to a Dead Letter Queue (Kafka Topic)

Produce a message which contains the processing context and error information to a Kafka topic. By default, the bootstrap.servers property in the worker config will be used to locate the Kafka cluster and the producer.* properties in the worker config will be used to configure the producer. This can be overridden using the errors.dlq.producer.* configs as shown below:

Config OptionDescriptionDefault ValueDomain
errors.deadletterqueue.enableWrite the error context into a Kafka topicfalseBoolean
errors.deadletterqueue.topic.nameThe name of the topic where these messages are written to""A valid Kafka topic name
errors.deadletterqueue.topic.partitionsNumber of partitions for this topic5[1, 2 .. Integer.MAX_INT]
errors.deadletterqueue.topic.replication.factorThe replication factor for this topic3[1, 2 .. Short.MAX_SHORT]
errors.deadletterqueue.include.configsInclude the (worker, connector) configs in the log.falseBoolean
errors.deadletterqueue.include.messagesInclude the Connect Record which failed to process in the log.falseBoolean
errors.deadletterqueue.producer.*Config for the KafkaProduce to produce to this topic (for example: errors.dlq.producer.bootstrap.servers will set the bootstrap servers of the Kafka cluster).  

Message Format for Error Context

Connect will use the JsonConverter to serialize the error context and log it in JSON format. The JSON object will contain the following fields:

TypeNameDescription
RecordrecordThe record which caused the exception (if available).
Stage[] stagesAn ordered array of stages.
intindexA pointer to a stage in the array where the failure occurred.
stringexceptionThe exception, and stacktrace for the failure (if available).
int32attemptNumber of attempts made to correct the failure.
stringtask_idThe id of the task which where the failure occurred.
int64time_of_errorThe epoch time of failure.

where Record encodes either the Sink or Source Record which was input to the failed operation.

TypeNameDescription
stringtopicthe topic which this message was read from or will be written to.
int32partitionthe partition which this message was read from or will be written to.
int64 timestampthe timestamp of the Kafka message.
SchemaAndValuekeythe key of the message.
SchemaAndValuevaluethe value of the message.
SchemaAndValueheadersthe headers of the message.
int64offsetthe offset of the message in the Kafka topic (available only in sink connectors).
stringtimestamp_typethe type of the timestamp. One of: NO_TIMESTAMP_TYPE, CREATE_TIME or LOG_APPEND_TIME (available only in sink connectors).
map<string, string>source_partitioninformation about the source partition from where this message was created from (available only in source connectors and contents are specific to the implementation).
map<string, string>source_offsetinformation about the source offset from where this message was created from (available only in source connectors and contents are specific to the implementation).

and SchemaAndValue is:

TypeNameDescription
stringschemathe data type of the value. For example, "BYTES", or "STRUCT".
string valuea string encoding of the object created using the JsonConverter.

Finally, the Stage object describes the different operations performed in a Connector pipeline:

TypeNameDescription
stringtypethe operation type. For example, TRANSFORMATION, KEY_CONVERTER or TASK_PULL.
map<string, string> configthe properties used to configure this object.
stringclassthe classname of the object performing this operation.

Metrics

The following new metrics will monitor the number of failures, and the behavior of the response handler. Specifically, the following set of counters:

  • for counting the failures at various stages.
  • for how the framework handled this error. 
    • number of messages retried 
    • number of records skipped 
    • number of messages logged to file or the dead letter queue

MBean namekafka.connect:type=task-error-metrics,connector=([-.\w]+),task=([-.\w]+)

Metric/Attribute Name

Description

Implemented
total-record-failuresTotal number of failures seen by this task.2.0.0
total-records-skippedTotal number of records skipped by this task.2.0.0
total-retriesTotal number of retries made by this task.2.0.0
total-failures-loggedThe number of messages that was logged into either the dead letter queue or with Log4j.2.0.0
deadletterqueue-records-producedNumber of records successfully produced to the dead letter queue.2.0.0
deadletterqueue-produce-failuresNumber of records which failed to produce correctly to the dead letter queue.2.0.0
last-failure-timestampThe timestamp when the last failure occurred in this task.2.0.0

Proposed Changes

A connector consists of multiple stages. For source connectors, Connect retrieves the records from the connector, applies zero or more transformations, uses the converters to serialize each record’s key, value, and headers, and finally writes each record to Kafka. For sink connectors, Connect reads the topic(s), uses the converters to deserialize each record’s key, value, and headers, and for each record applies zero or more transformations and delivers the records to the sink connector. In this proposal, we will specifically deal with the following failure scenarios which can occur during these stages:

OperationException Handled
Transformation
java.lang.Exception
Key, Value and Header Converter
java.lang.Exception
Kafka Produce and Consumeorg.apache.kafka.common.errors.RetriableException
Other Operationsorg.apache.kafka.connect.errors.RetriableException

There are two behavioral changes introduced by this KIP. First, a failure in any stage will be reattempted, thereby “blocking” the connector. This helps in situations where time is needed to manually update an external system, such as manually correcting a schema in the Schema Registry. More complex causes, such as requiring code changes or corruptions in the data that can’t be fixed externally, will require the worker to be stopped, data to be fixed and then the Connector to be restarted. In the case of data corruption, the topic might need to be cleaned up too. If the retry limit for a failure is reached, then the tolerance limit is used to determine if this record should be skipped, or if the task is to be killed.

The second behavioral change is introduced in how we log these failures. Currently, only the exception which kills the task is written with the application logs. With the additions presented in this KIP, the following context will be logged in JSON format: 

  1. The record which failed to process (if available). Any binary data will be base64 encoded. 
  2. The Exception along with the stack trace.
  3. Information about the stage which failed to process the record (for example: classname of the transform and its configs).
  4. The various stages in the connector (names of classes) and order of processing.
  5. The number of attempts, and time taken for these attempts.

Logging errors with application logs is convenient and requires no additional setup. The log messages are informative but not made easily actionable. For example, it is hard to collect log files from various machines, parse them and take appropriate actions. By introducing a dead letter queue, we can overcome these problems. We implement a dead letter queue using a single Kafka topic per connector (as determined by the errors.deadletterqueue.topic.name property). Error context will be logged as JSON strings into this topic. An example JSON record is shown below (example 4). 

While logging the error context, it might be worthwhile to take precautions to hide sensitive content. For example, some of the configs might contain sensitive information such as usernames or passwords. To prevent logging critical information, we provide configuration options to disable logging the messages (errors.dlq.include.messages) and configs (errors.dlq.include.configs).

Example 1: Fail Fast

To maintain backward compatibility, by default a Connector task will fail immediately upon an error or exception. This reflects the same behavior as earlier releases, ensuring that existing installations work the same way. Although it is not necessary to add extra configuration properties, the following properties may be added to a connector configuration to achieve this older behavior:

# disable retries on failure
errors.retries.limit=0

# do not log the error and their contexts
errors.log.enable=false

# do not record errors in a dead letter queue topic
errors.dlq.enable=false

# Fail on first failure
errors.tolerance.limit=0

Example 2: Record and Skip

The following configuration shows how to setup error handling with multiple retries, logging both to the application logs and a Kafka topic with infinite tolerance:

# retry at most 100 times waiting up to 5 minutes between consecutive failures
errors.retries.limit=100
errors.retries.delay.max.ms=300000

# log error context along with application logs, but do not include configs and messages
errors.log.enable=true
errors.log.include.configs=false
errors.log.include.messages=false

# produce error context into the Kafka topic
errors.dlq.enable=true
errors.dlq.topic.name=my-connector-errors
errors.dlq.topic.partitions=25
errors.dlq.topic.replication.factor=3
errors.dlq.include.configs=true
errors.dlq.include.messages=true

# Tolerate all errors.
errors.tolerance.limit=-1
errors.tolerance.rate.limit=-1
errors.tolerance.rate.duration.ms=60000

Example 3: Record to separate Kafka cluster

In the previous example, errors are recorded in the log and in a separate "dead letter queue" (DLQ) Kafka topic in the same broker cluster that Connect is using for its internal topics. It is possible to record the errors in a DLQ on a separate Kafka cluster by defining extra errors.dlq.producer.* configuration properties. Here is the same set of connector configuration properties as in Example 2, except with the additional errors.dlq.producer.* properties:

# retry up to 100 times waiting up to 5 minutes between consecutive failures
errors.retries.limit=100
errors.retries.delay.max.ms=300000

# log error context along with application logs, but do not include configs and messages
errors.log.enable=true
errors.log.include.configs=false
errors.log.include.messages=false

# produce error context into a secure Kafka topic
errors.dlq.enable=true
errors.dlq.topic.name=my-connector-errors
errors.dlq.topic.partitions=25
errors.dlq.topic.replication.factor=3
errors.dlq.include.configs=true
errors.dlq.include.messages=true
errors.dlq.producer.bootstrap.servers=secure-broker:9093
errors.dlq.producer.acks = 0
errors.dlq.producer.security.protocol=SSL
errors.dlq.producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
errors.dlq.producer.ssl.truststore.password=test1234
errors.dlq.producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
errors.dlq.producer.ssl.keystore.password=test1234
errors.dlq.producer.ssl.key.password=test1234

# Tolerate all errors
errors.tolerance.limit=-1
errors.tolerance.rate.limit=-1
errors.tolerance.rate.duration.ms=60000

Example 4: Error context formatted as JSON

The following example shows how the error context will appear if an error occurs when trying to deserialize a byte array using a JsonConverter in a FileSink connector which uses a ReplaceField transformation. Note that there were 10 reattempts, and a final error was declared at time '1526518715403'. The original record, shown here in a base64 encoded string at record.value.object is {"a: 10}, while the key is a struct {"id": 1012}. The exception we are seeing is the missing double quote after the field name. It must be noted that if we had a org.apache.kafka.connect.data.Struct instead of a raw object when the error occurred, it would be formatted appropriately. In order to apply corrections to bad records, the developer can read records from the dead letter queue topic, analyze the exception, deserialize the record appropriately (using the schema fields in the record node to guide the deserialization), apply the fixes and write a corrected record to the original Kafka topic which is used to feed records to the sink connector. In the example below, one would fix the quote and write the record back to the connect-test topic.

For errors in a source connector, the process is similar, but care needs to be taken while writing back to the source. For example, if the source is a SQL database, then we can append records to a table so that it can be picked up in the next SourceTask#poll() cycle. Furthermore, source records can store lineage information with the record (via the sourcePartition and sourceOffset fields in SourceRecords). These would be logged with the context, and can serve to determine how to insert the corrected records. For example, the JDBC source connector puts the row information with every message (the table name, primary key, timestamp fields), and this can be used to update the bad rows. Please note that the changes introduced in this KIP only present the errors. Applying the corrections themselves is beyond the scope of this KIP. 

 

{
  "record": {
    "topic": "connect-test",
    "timestamp": 1526518349968,
    "timestamp_type": "CREATE_TIME",
    "offset": 918264,
    "partition": 7,
    "key": {
      "schema": "STRUCT",
      "object": "{\"id\":1012}"
    },
    "value": {
      "schema": "BYTES",
      "object": "eyJhOiAxMH0="
    }
  },
  "index": 1,
  "exception": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \norg.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)\n\t...",
  "attempt": 10,
  "task_id": "my-connector-1",
  "time_of_error": 1526518715403,
  "stages": [
    {
      "type": "KEY_CONVERTER",
      "config": {
        "converter.type": "value",
        "schemas.enable": "false"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    },
    {
      "type": "VALUE_CONVERTER",
      "config": {
        "converter.type": "value",
        "schemas.enable": "false"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    },
    {
      "type": "TRANSFORMATION",
      "config": {
        "whitelist": "a,b"
      },
      "class": "org.apache.kafka.connect.transforms.ReplaceField$Value"
    },
    {
      "type": "TASK_PUT",
      "config": {
        "name": "local-file-sink",
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test.sink.txt",
        "topics": "connect-test"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    }
  ]
}

 

Compatibility, Deprecation, and Migration Plan

The current behavior in Connect is to kill the task on the first error in any stage. As mentioned above, this will remain the default behavior if connector configurations are not changed.

Rejected Alternatives

Correcting records in the handler: the handler will not be responsible for providing corrected records. For sink records, the user can correct records and write the corrected records back to the origin Kafka topics using the dead letter queue mentioned above. For source records, the user can analyze the error messages and fix the data at the source.

Allow per-stage error handler: This would have provided finer grained error handling. But comes at the expense of more configuration, and users having to ensure that the different stages are using compatible error handling. It is also not evident that this is more useful than simply taking the most flexible of handlers and applying it across all stages.

Interceptors for Erroneous Records: Similar to ProducerInterceptor and ConsumerInterceptor, we could potentially add ErrorInterceptors too. But given that the handlers subsumes most of the functionalities here, we decided to not provide this feature.

Defining and configuring properties in the worker config: Firstly, it is out of place to specify error handling properties in a worker config when it will never be used by a worker (since all the failures are handled at the Connector level). Secondly, adding inheritance in configurations adds a level of complexity which can be avoided at this stage of development. 

  • No labels