Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For errors in a source connector, the process is similar, but care needs to be taken while writing back to the source. For example, if the source is a SQL database, then we can append records to a table so that it can be picked up in the next SourceTask#poll() cycle. Furthermore, source records can store lineage information with the record (via the sourcePartition and sourceOffset fields in SourceRecords). These would be logged with the context, and can serve to determine how to insert the corrected records. For example, the JDBC source connector adds store connector puts the row information with every message (the table name, primary key, timestamp fields), and this can be used to update the bad rows. Please note that the changes introduced in this KIP only present the errors. Applying the corrections themselves is beyond the scope of this KIP. 

 

Code Block
languagejava
{
  "record": {
    "topic": "connect-test",
    "timestamp": 1526518349968,
    "offset": 918264,
    "partition": 7,
    "value": {
      "schema": "SCHEMA{BYTES}",
      "object": "eyJhOiAxMH0="
    }
  },
  "stage": {
    "type": "VALUE_CONVERTER",
    "config": {
      "converter.type": "value",
      "schemas.enable": "false"
    },
    "class": "org.apache.kafka.connect.json.JsonConverter"
  },
  "index": 0,
  "exception": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \norg.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)\n\t...",
  "attempt": 10,
  "task_id": "my-connector-1",
  "time_of_error": 1526518715403,
  "stages": [
    {
      "type": "VALUE_CONVERTER",
      "config": {
        "converter.type": "value",
        "schemas.enable": "false"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    },
    {
      "type": "TRANSFORMATION",
      "config": {
        "converter.typewhitelist": "value",
        "schemas.enable": "false"f1,f2"
      },
      "class": "org.apache.kafka.connect.jsontransforms.JsonConverterReplaceField$Value"
    },
    {
      "type": "TASK_PUT",
      "config": {
        "name": "local-file-sink",
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test.sink.txt",
        "topics": "connect-test"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    }
  ]
}

 

...

adds store the row informationCompatibility, Deprecation, and Migration Plan

The current behavior in Connect is to kill the task on the first error in any stage. As mentioned above, this will remain the default behavior if connector configurations are not changed.

...