Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread: here

JIRA: here

The proposal discussed in this KIP is implemented in this pull request.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Logging errors with application logs is convenient and requires no additional setup. The log messages are informative but not made easily actionable. For example, it is hard to collect log files from various machines, parse them and take appropriate actions. By introducing a dead letter queue, we can overcome these problems. We implement a dead letter queue using a single Kafka topic per connector (as determined by the errors.deadletterqueue.topic.name property). Error context will be logged as JSON strings into this topic. 

For sink connectors, a developer can consume bad records from this topic, correct them and write the corrected records back to the original Kafka topics. Similarly, for source connectors, the developer can write the corrected records back to the original source. 

While logging the error context, it might be worthwhile to take precautions to hide sensitive content. For example, some of the configs might contain sensitive information such as usernames or passwords. To prevent logging critical information, we provide configuration options to disable logging the messages (errors.dlq.include.messages) and configs (errors.dlq.include.configs).

Example 1: Fail Fast

To maintain backward compatibility, by default a Connector task will fail immediately upon an error or exception. This reflects the same behavior as earlier releases, ensuring that existing installations work the same way. Although it is not necessary to add extra configuration properties, the following properties may be added to a connector configuration to achieve this older behavior:

...

Code Block
languagejava
# retry up to 100 times waiting up to 5 minutes between consecutive failures
errors.retries.limit=100
errors.retries.delay.max.ms=300000

# log error context along with application logs, but do not include configs and messages
errors.log.enable=true
errors.log.include.configs=false
errors.log.include.messages=false

# produce error context into a secure Kafka topic
errors.dlq.enable=true
errors.dlq.topic.name=my-connector-errors
errors.dlq.topic.partitions=25
errors.dlq.topic.replication.factor=3
errors.dlq.include.configs=true
errors.dlq.include.messages=true
errors.dlq.producer.bootstrap.servers=secure-broker:9093
errors.dlq.producer.acks = 0
errors.dlq.producer.security.protocol=SSL
errors.dlq.producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
errors.dlq.producer.ssl.truststore.password=test1234
errors.dlq.producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
errors.dlq.producer.ssl.keystore.password=test1234
errors.dlq.producer.ssl.key.password=test1234

# Tolerate all errors
errors.tolerance.limit=-1
errors.tolerance.rate.limit=-1
errors.tolerance.rate.duration.ms=60000

Example 4: Error context formatted as JSON

The following example shows how the error context will appear if an error occurs when trying to deserialize a byte array using a JsonConverter in a FileSink connector which uses a ReplaceField transformation. Note that there were 10 reattempts, and a final error was declared at time '1526518715403'.

 

Code Block
languagejava
{
  "record": {
    "topic": "my-topic",
    "timestamp": 1526518349968,
    "partition": 7,
    "value": {
      "schema": "BYTES",
      "object": "IllYQmhZMmhsSUd0aFptdGhJR052Ym01bFkzUT0i"
    }
  },
  "stage": {
    "type": "VALUE_CONVERTER",
    "config": {
      "converter.type": "value",
      "schemas.enable": "false"
    },
    "class": "org.apache.kafka.connect.json.JsonConverter"
  },
  "index": 0,
  "exception": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \norg.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)\n\t...",
  "attempt": 10,
  "task_id": "my-connector-1",
  "time_of_error": 1526518715403,
  "stages": [
    {
      "type": "VALUE_CONVERTER",
      "config": {
        "converter.type": "value",
        "schemas.enable": "false"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    },
    {
      "type": "TRANSFORMATION",
      "config": {
        "converter.type": "value",
        "schemas.enable": "false"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    },
    {
      "type": "TASK_PUT",
      "config": {
        "name": "local-file-sink",
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test.sink.txt",
        "topics": "connect-test"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    }
  ]
}

 

Compatibility, Deprecation, and Migration Plan

...