THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Discussion thread: here
JIRA: here
The proposal discussed in this KIP is implemented in this pull request.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Config Option | Description | Default Value | Domain |
---|---|---|---|
errors.deadletterqueue.enable | Write the error context into a Kafka topic | false | Boolean |
errors.deadletterqueue.topic.name | The name of the topic where these messages are written to | "" | A valid Kafka topic name |
errors.deadletterqueue.topic.partitions | Number of partitions for this topic | 5 | [1, 2 .. Integer.MAX_INT] |
errors.deadletterqueue.topic.replication.factor | The replication factor for this topic | 3 | [1, 2 .. Short.MAX_SHORT] |
errors.deadletterqueue.include.configs | Include the (worker, connector) configs in the log. | false | Boolean |
errors.deadletterqueue.include.messages | Include the Connect Record which failed to process in the log. | false | Boolean |
errors.deadletterqueue.producer.* | Config for the KafkaProduce to produce to this topic (for example: errors.dlq.producer.bootstrap.servers will set the bootstrap servers of the Kafka cluster). |
Message Format for Error Context
The schema for the logged error context is the following:
Code Block | ||
---|---|---|
| ||
{
"record": { "type": "object", "optional": false, "keys": {
"topic": { "type": "string", "optional": false },
"timestamp": { "type": "int64", "optional": true },
"partition": { "type": "int32", "optional": false },
"value": { "type": "object", "optional": true, "keys": {
"schema": { "type": "string", "optional": false },
"object": { "type": "string", "optional": false }
} },
"source_partition": { "type": "object", "optional": false, "keys": {} },
"source_offset": { "type": "object", "optional": false, "keys": {} }
} },
"index": { "type": "int32", "optional": false },
"exception": { "type": "string", "optional": false },
"attempt": { "type": "int32", "optional": false },
"task_id": { "type": "string", "optional": false },
"time_of_error": { "type": "int64", "optional": false },
"stages": { "type": "array", "optional": false, "keys": {
"type": { "type": "string" },
"config": { "type": "object", "optional": false, "keys": {} },
"class": { "type": "string", "optional": false }
} }
} |
Metrics
The following new metrics will monitor the number of failures, and the behavior of the response handler. Specifically, the following set of counters:
...