Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Logging errors with application logs is convenient and requires no additional setup. The log messages are informative but not made easily actionable. For example, it is hard to collect log files from various machines, parse them and take appropriate actions. By introducing a dead letter queue, we can overcome these problems. We implement a dead letter queue using a single Kafka topic per connector (as determined by the errors.deadletterqueue.topic.name property). Error context will be logged as JSON strings into this topic.  For sink connectors, a developer can consume bad records from this topic, correct them and write the corrected records back to the original Kafka topics. Similarly, for source connectors, the developer can write the corrected records back to the original sourceAn example JSON record is shown below (example 4)

While logging the error context, it might be worthwhile to take precautions to hide sensitive content. For example, some of the configs might contain sensitive information such as usernames or passwords. To prevent logging critical information, we provide configuration options to disable logging the messages (errors.dlq.include.messages) and configs (errors.dlq.include.configs).

...

The following example shows how the error context will appear if an error occurs when trying to deserialize a byte array using a JsonConverter in a FileSink connector which uses a ReplaceField transformation. Note that there were 10 reattempts, and a final error was declared at time '1526518715403'. The original record, shown here in a base64 encoded string at record.value.object is {"a: 10}. The exception we are seeing is the missing double quote after the field name. It must be noted that if we had a org.apache.kafka.connect.data.Struct instead of a raw object when the error occurred, it would be formatted appropriately. In order to apply corrections to bad records, the developer can read records from the dead letter queue topic, analyze the exception, deserialize the record appropriately (using the schema fields in the record node to guide the deserialization), apply the fixes and write a corrected record to the original Kafka topic which is used to feed records to the sink connector. In the example below, one would fix the quote and write the record back to the connect-test topic.

For errors in a source connector, the process is similar, but care needs to be taken while writing back to the source. For example, if the source is a SQL database, then we can append records to a table so that it can be picked up in the next SourceTask#poll() cycle. Furthermore, source records can store lineage information with the record (via the sourcePartition and sourceOffset fields in SourceRecords). These would be logged with the context, and can serve to determine how to insert the corrected records. For example, the JDBC source connector adds store the row information with every message (the table name, primary key, timestamp fields), and this can be used to update the bad rows. 

 

 

Code Block
languagejava
{
  "record": {
    "topic": "myconnect-topictest",
    "timestamp": 1526518349968,
    "offset": 918264
    "partition": 7,
    "value": {
      "schema": "SCHEMA{BYTES}",
      "object": "IllYQmhZMmhsSUd0aFptdGhJR052Ym01bFkzUT0ieyJhOiAxMH0="
    }
  },
  "stage": {
    "type": "VALUE_CONVERTER",
    "config": {
      "converter.type": "value",
      "schemas.enable": "false"
    },
    "class": "org.apache.kafka.connect.json.JsonConverter"
  },
  "index": 0,
  "exception": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \norg.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)\n\t...",
  "attempt": 10,
  "task_id": "my-connector-1",
  "time_of_error": 1526518715403,
  "stages": [
    {
      "type": "VALUE_CONVERTER",
      "config": {
        "converter.type": "value",
        "schemas.enable": "false"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    },
    {
      "type": "TRANSFORMATION",
      "config": {
        "converter.type": "value",
        "schemas.enable": "false"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    },
    {
      "type": "TASK_PUT",
      "config": {
        "name": "local-file-sink",
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test.sink.txt",
        "topics": "connect-test"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    }
  ]
}

 

...