Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread: here

JIRA: here

The proposal discussed in this KIP is implemented in this pull request.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Config OptionDescriptionDefault ValueDomain
errors.deadletterqueue.enableWrite the error context into a Kafka topicfalseBoolean
errors.deadletterqueue.topic.nameThe name of the topic where these messages are written to""A valid Kafka topic name
errors.deadletterqueue.topic.partitionsNumber of partitions for this topic5[1, 2 .. Integer.MAX_INT]
errors.deadletterqueue.topic.replication.factorThe replication factor for this topic3[1, 2 .. Short.MAX_SHORT]
errors.deadletterqueue.include.configsInclude the (worker, connector) configs in the log.falseBoolean
errors.deadletterqueue.include.messagesInclude the Connect Record which failed to process in the log.falseBoolean
errors.deadletterqueue.producer.*Config for the KafkaProduce to produce to this topic (for example: errors.dlq.producer.bootstrap.servers will set the bootstrap servers of the Kafka cluster).  

Message Format for Error Context

The schema for the logged error context is the following:

Code Block
languagejava
{
  "record": { "type": "object", "optional": false, "keys": {
    "topic": { "type": "string", "optional": false },
    "timestamp": { "type": "int64", "optional": true },
    "partition": { "type": "int32", "optional": false },
    "value": { "type": "object", "optional": true, "keys": {
        "schema": { "type": "string", "optional": false },
        "object": { "type": "string", "optional": false }
      } },
    "source_partition": { "type": "object", "optional": false, "keys": {} },
    "source_offset": { "type": "object", "optional": false, "keys": {} }
    } },
  "index": { "type": "int32", "optional": false },
  "exception": { "type": "string", "optional": false },
  "attempt": { "type": "int32", "optional": false },
  "task_id": { "type": "string", "optional": false },
  "time_of_error": { "type": "int64", "optional": false },
  "stages": { "type": "array", "optional": false, "keys": { 
    "type": { "type": "string" },
    "config": { "type": "object", "optional": false, "keys": {} },
    "class": { "type": "string", "optional": false } 
  } }
}

Metrics

The following new metrics will monitor the number of failures, and the behavior of the response handler. Specifically, the following set of counters:

...