...
Discussion thread: here
JIRA: here
The proposal discussed in this KIP is implemented in this pull request.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The following example shows how the error context will appear if an error occurs when trying to deserialize a byte array using a JsonConverter
in a FileSink
connector which uses a ReplaceField
transformation. Note that there were 10 reattempts, and a final error was declared at time '1526518715403'. The original record, shown here in a base64 encoded string at record.value.object
is {"a: 10}
. The ,
while the key is a struct {"id": 1012}.
The exception we are seeing is the missing double quote after the field name. It must be noted that if we had a org.apache.kafka.connect.data.Struct
instead of a raw object when the error occurred, it would be formatted appropriately. In order to apply corrections to bad records, the developer can read records from the dead letter queue topic, analyze the exception, deserialize the record appropriately (using the schema fields in the record node to guide the deserialization), apply the fixes and write a corrected record to the original Kafka topic which is used to feed records to the sink connector. In the example below, one would fix the quote and write the record back to the connect-test
topic.
...
Code Block | ||
---|---|---|
| ||
{ "record": { "topic": "connect-test", "timestamp": 1526518349968, "offset": 918264, "partition": 7, "valuekey": { "schema": "SCHEMA{BYTESSTRUCT}", "object": "eyJhOiAxMH0={\"id\":1012}" }, }, "stagevalue": { "typeschema": "VALUE_CONVERTERSCHEMA{BYTES}", "configobject": {"eyJhOiAxMH0=" } "converter.type}, "stage": "value",{ "type": "VALUE_CONVERTER", "config": { "converter.type": "value", "schemas.enable": "false" }, "class": "org.apache.kafka.connect.json.JsonConverter" }, "index": 01, "exception": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \norg.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)\n\t...", "attempt": 10, "task_id": "my-connector-1", "time_of_error": 1526518715403, "stages": [ { "type": "KEY_CONVERTER", "config": { "converter.type": "value", "schemas.enable": "false" }, "class": "org.apache.kafka.connect.json.JsonConverter" }, { "type": "VALUE_CONVERTER", "config": { "converter.type": "value", "schemas.enable": "false" }, "class": "org.apache.kafka.connect.json.JsonConverter" }, { "type": "TRANSFORMATION", "config": { "whitelist": "f1,f2" }, "class": "org.apache.kafka.connect.transforms.ReplaceField$Value" }, { "type": "TASK_PUT", "config": { "name": "local-file-sink", "connector.class": "FileStreamSink", "tasks.max": 1, "file": "test.sink.txt", "topics": "connect-test" }, "class": "org.apache.kafka.connect.json.JsonConverter" } ] } |
...