Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread: here

JIRA: here

The proposal discussed in this KIP is implemented in this pull request.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The following example shows how the error context will appear if an error occurs when trying to deserialize a byte array using a JsonConverter in a FileSink connector which uses a ReplaceField transformation. Note that there were 10 reattempts, and a final error was declared at time '1526518715403'. The original record, shown here in a base64 encoded string at record.value.object is {"a: 10}. The , while the key is a struct {"id": 1012}. The exception we are seeing is the missing double quote after the field name. It must be noted that if we had a org.apache.kafka.connect.data.Struct instead of a raw object when the error occurred, it would be formatted appropriately. In order to apply corrections to bad records, the developer can read records from the dead letter queue topic, analyze the exception, deserialize the record appropriately (using the schema fields in the record node to guide the deserialization), apply the fixes and write a corrected record to the original Kafka topic which is used to feed records to the sink connector. In the example below, one would fix the quote and write the record back to the connect-test topic.

...

 

Code Block
languagejava
{
  "record": {
    "topic": "connect-test",
    "timestamp": 1526518349968,
    "offset": 918264,
    "partition": 7,
    "valuekey": {
      "schema": "SCHEMA{BYTESSTRUCT}",
      "object": "eyJhOiAxMH0={\"id\":1012}"
    },
  },
  "stagevalue": {
      "typeschema": "VALUE_CONVERTERSCHEMA{BYTES}",
      "configobject": {"eyJhOiAxMH0="
    }
  "converter.type},
  "stage": "value",{
    "type": "VALUE_CONVERTER",
    "config": {
      "converter.type": "value",
      "schemas.enable": "false"
    },
    "class": "org.apache.kafka.connect.json.JsonConverter"
  },
  "index": 01,
  "exception": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \norg.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)\n\t...",
  "attempt": 10,
  "task_id": "my-connector-1",
  "time_of_error": 1526518715403,
  "stages": [
    {
      "type": "KEY_CONVERTER",
      "config": {
        "converter.type": "value",
        "schemas.enable": "false"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    },
    {
      "type": "VALUE_CONVERTER",
      "config": {
        "converter.type": "value",
        "schemas.enable": "false"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    },
    {
      "type": "TRANSFORMATION",
      "config": {
        "whitelist": "f1,f2"
      },
      "class": "org.apache.kafka.connect.transforms.ReplaceField$Value"
    },
    {
      "type": "TASK_PUT",
      "config": {
        "name": "local-file-sink",
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test.sink.txt",
        "topics": "connect-test"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    }
  ]
}

 

...