THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Message Format for Error Context
The schema for the logged error context is object contains the following fields:
Type | Name |
---|
...
Description |
---|
...
Record |
...
record | The record which caused the exception. | |
Stage[] | stages | An ordered array of stages. |
int | index | The index into the above array to the stage where the failure occurred. |
string | exception | The exception, and stacktrace for the failure (if available). |
int32 | attempt | Number of attempts made to correct the failure. |
string | task_id | The id of the task which where the failure occurred. |
int64 | time_of_error | The epoch time of failure. |
where Record contains the following fields
Type | Name | Description |
---|---|---|
string | topic | the topic which this message was read from or will be written to. |
int64 | timestamp | the timestamp of the Kafka message. |
string | timestamp_type | the type of the timestamp. One of: NO_TIMESTAMP_TYPE, CREATE_TIME or LOG_APPEND_TIME. |
int32 | partition | the partition which this message was read from or will be written to. |
SchemaAndValue | key | the key of the message. |
SchemaAndValue | value | the value of the message. |
SchemaAndValue | headers | the headers of the message. |
map<string, string> | source_partition | information about the source partition from where this message was created from (usually, specific to a source connector implementation). |
map<string, string> | source_offset | information about the source offset from where this message was created from (usually, specific to a source connector implementation). |
and SchemaAndValue is:
Type | Name | Description |
---|---|---|
string | schema | the data type of the value. For example, "BYTES", or "STRUCT" |
string | value | a string encoding of the value. If the type is "BYTES", then this value must be base64 decoded to obtain the original byte[]. If not, it is a JSON representation of the Connect record. |
Metrics
The following new metrics will monitor the number of failures, and the behavior of the response handler. Specifically, the following set of counters:
...
Code Block | ||
---|---|---|
| ||
{ "record": { "topic": "connect-test", "timestamp": 1526518349968, "timestamp_type": "CREATE_TIME", "offset": 918264, "partition": 7, "key": { "schema": "SCHEMA{STRUCT}", "object": "{\"id\":1012}" }, "value": { "schema": "SCHEMA{BYTES}", "object": "eyJhOiAxMH0=" } }, "index": 1, "exception": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \norg.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)\n\t...", "attempt": 10, "task_id": "my-connector-1", "time_of_error": 1526518715403, "stages": [ { "type": "KEY_CONVERTER", "config": { "converter.type": "value", "schemas.enable": "false" }, "class": "org.apache.kafka.connect.json.JsonConverter" }, { "type": "VALUE_CONVERTER", "config": { "converter.type": "value", "schemas.enable": "false" }, "class": "org.apache.kafka.connect.json.JsonConverter" }, { "type": "TRANSFORMATION", "config": { "whitelist": "a,b" }, "class": "org.apache.kafka.connect.transforms.ReplaceField$Value" }, { "type": "TASK_PUT", "config": { "name": "local-file-sink", "connector.class": "FileStreamSink", "tasks.max": 1, "file": "test.sink.txt", "topics": "connect-test" }, "class": "org.apache.kafka.connect.json.JsonConverter" } ] } |
Compatibility, Deprecation, and Migration Plan
...