Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Message Format for Error Context

The schema for the logged error context is object contains the following fields:

TypeName

...

Description

...

Record

...

recordThe record which caused the exception.
Stage[] stagesAn ordered array of stages.
intindexThe index into the above array to the stage where the failure occurred.
stringexceptionThe exception, and stacktrace for the failure (if available).
int32attemptNumber of attempts made to correct the failure.
stringtask_idThe id of the task which where the failure occurred.
int64time_of_errorThe epoch time of failure.

where Record contains the following fields

TypeNameDescription
stringtopicthe topic which this message was read from or will be written to.
int64  timestampthe timestamp of the Kafka message.
stringtimestamp_typethe type of the timestamp. One of: NO_TIMESTAMP_TYPE, CREATE_TIME or LOG_APPEND_TIME.
int32partitionthe partition which this message was read from or will be written to.
SchemaAndValuekeythe key of the message.
SchemaAndValuevaluethe value of the message.
SchemaAndValueheadersthe headers of the message.
map<string, string>source_partitioninformation about the source partition from where this message was created from (usually, specific to a source connector implementation).
map<string, string>source_offsetinformation about the source offset from where this message was created from (usually, specific to a source connector implementation).

and SchemaAndValue is:

TypeNameDescription
stringschemathe data type of the value. For example, "BYTES", or "STRUCT"
string valuea string encoding of the value. If the type is "BYTES", then this value must be base64 decoded to obtain the original byte[]. If not, it is a JSON representation of the Connect record.

Metrics

The following new metrics will monitor the number of failures, and the behavior of the response handler. Specifically, the following set of counters:

...

 

Code Block
languagejava
{
  "record": {
    "topic": "connect-test",
    "timestamp": 1526518349968,
    "timestamp_type": "CREATE_TIME",
    "offset": 918264,
    "partition": 7,
    "key": {
      "schema": "SCHEMA{STRUCT}",
      "object": "{\"id\":1012}"
    },
    "value": {
      "schema": "SCHEMA{BYTES}",
      "object": "eyJhOiAxMH0="
    }
  },
  "index": 1,
  "exception": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \norg.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)\n\t...",
  "attempt": 10,
  "task_id": "my-connector-1",
  "time_of_error": 1526518715403,
  "stages": [
    {
      "type": "KEY_CONVERTER",
      "config": {
        "converter.type": "value",
        "schemas.enable": "false"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    },
    {
      "type": "VALUE_CONVERTER",
      "config": {
        "converter.type": "value",
        "schemas.enable": "false"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    },
    {
      "type": "TRANSFORMATION",
      "config": {
        "whitelist": "a,b"
      },
      "class": "org.apache.kafka.connect.transforms.ReplaceField$Value"
    },
    {
      "type": "TASK_PUT",
      "config": {
        "name": "local-file-sink",
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test.sink.txt",
        "topics": "connect-test"
      },
      "class": "org.apache.kafka.connect.json.JsonConverter"
    }
  ]
}

 

Compatibility, Deprecation, and Migration Plan

...