...
For errors in a source connector, the process is similar, but care needs to be taken while writing back to the source. For example, if the source is a SQL database, then we can append records to a table so that it can be picked up in the next SourceTask#poll()
cycle. Furthermore, source records can store lineage information with the record (via the sourcePartition
and sourceOffset
fields in SourceRecords). These would be logged with the context, and can serve to determine how to insert the corrected records. For example, the JDBC source connector adds store connector puts the row information with every message (the table name, primary key, timestamp fields), and this can be used to update the bad rows. Please note that the changes introduced in this KIP only present the errors. Applying the corrections themselves is beyond the scope of this KIP.
Code Block | ||
---|---|---|
| ||
{ "record": { "topic": "connect-test", "timestamp": 1526518349968, "offset": 918264, "partition": 7, "value": { "schema": "SCHEMA{BYTES}", "object": "eyJhOiAxMH0=" } }, "stage": { "type": "VALUE_CONVERTER", "config": { "converter.type": "value", "schemas.enable": "false" }, "class": "org.apache.kafka.connect.json.JsonConverter" }, "index": 0, "exception": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \norg.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)\n\t...", "attempt": 10, "task_id": "my-connector-1", "time_of_error": 1526518715403, "stages": [ { "type": "VALUE_CONVERTER", "config": { "converter.type": "value", "schemas.enable": "false" }, "class": "org.apache.kafka.connect.json.JsonConverter" }, { "type": "TRANSFORMATION", "config": { "converter.typewhitelist": "value", "schemas.enable": "false"f1,f2" }, "class": "org.apache.kafka.connect.jsontransforms.JsonConverterReplaceField$Value" }, { "type": "TASK_PUT", "config": { "name": "local-file-sink", "connector.class": "FileStreamSink", "tasks.max": 1, "file": "test.sink.txt", "topics": "connect-test" }, "class": "org.apache.kafka.connect.json.JsonConverter" } ] } |
...
adds store the row informationCompatibility, Deprecation, and Migration Plan
The current behavior in Connect is to kill the task on the first error in any stage. As mentioned above, this will remain the default behavior if connector configurations are not changed.
...