...
Discussion thread: here
JIRA: here
The proposal discussed in this KIP is implemented in this pull request.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Logging errors with application logs is convenient and requires no additional setup. The log messages are informative but not made easily actionable. For example, it is hard to collect log files from various machines, parse them and take appropriate actions. By introducing a dead letter queue, we can overcome these problems. We implement a dead letter queue using a single Kafka topic per connector (as determined by the errors.deadletterqueue
.topic.name
property). Error context will be logged as JSON strings into this topic.
For sink connectors, a developer can consume bad records from this topic, correct them and write the corrected records back to the original Kafka topics. Similarly, for source connectors, the developer can write the corrected records back to the original source.
While logging the error context, it might be worthwhile to take precautions to hide sensitive content. For example, some of the configs might contain sensitive information such as usernames or passwords. To prevent logging critical information, we provide configuration options to disable logging the messages (errors.dlq.include.messages
) and configs (errors.dlq.include.configs
).
Example 1: Fail Fast
To maintain backward compatibility, by default a Connector task will fail immediately upon an error or exception. This reflects the same behavior as earlier releases, ensuring that existing installations work the same way. Although it is not necessary to add extra configuration properties, the following properties may be added to a connector configuration to achieve this older behavior:
...
Code Block | ||
---|---|---|
| ||
# retry up to 100 times waiting up to 5 minutes between consecutive failures errors.retries.limit=100 errors.retries.delay.max.ms=300000 # log error context along with application logs, but do not include configs and messages errors.log.enable=true errors.log.include.configs=false errors.log.include.messages=false # produce error context into a secure Kafka topic errors.dlq.enable=true errors.dlq.topic.name=my-connector-errors errors.dlq.topic.partitions=25 errors.dlq.topic.replication.factor=3 errors.dlq.include.configs=true errors.dlq.include.messages=true errors.dlq.producer.bootstrap.servers=secure-broker:9093 errors.dlq.producer.acks = 0 errors.dlq.producer.security.protocol=SSL errors.dlq.producer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks errors.dlq.producer.ssl.truststore.password=test1234 errors.dlq.producer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks errors.dlq.producer.ssl.keystore.password=test1234 errors.dlq.producer.ssl.key.password=test1234 # Tolerate all errors errors.tolerance.limit=-1 errors.tolerance.rate.limit=-1 errors.tolerance.rate.duration.ms=60000 |
Example 4: Error context formatted as JSON
The following example shows how the error context will appear if an error occurs when trying to deserialize a byte array using a JsonConverter
in a FileSink
connector which uses a ReplaceField
transformation. Note that there were 10 reattempts, and a final error was declared at time '1526518715403'.
Code Block | ||
---|---|---|
| ||
{
"record": {
"topic": "my-topic",
"timestamp": 1526518349968,
"partition": 7,
"value": {
"schema": "BYTES",
"object": "IllYQmhZMmhsSUd0aFptdGhJR052Ym01bFkzUT0i"
}
},
"stage": {
"type": "VALUE_CONVERTER",
"config": {
"converter.type": "value",
"schemas.enable": "false"
},
"class": "org.apache.kafka.connect.json.JsonConverter"
},
"index": 0,
"exception": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \norg.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)\n\t...",
"attempt": 10,
"task_id": "my-connector-1",
"time_of_error": 1526518715403,
"stages": [
{
"type": "VALUE_CONVERTER",
"config": {
"converter.type": "value",
"schemas.enable": "false"
},
"class": "org.apache.kafka.connect.json.JsonConverter"
},
{
"type": "TRANSFORMATION",
"config": {
"converter.type": "value",
"schemas.enable": "false"
},
"class": "org.apache.kafka.connect.json.JsonConverter"
},
{
"type": "TASK_PUT",
"config": {
"name": "local-file-sink",
"connector.class": "FileStreamSink",
"tasks.max": 1,
"file": "test.sink.txt",
"topics": "connect-test"
},
"class": "org.apache.kafka.connect.json.JsonConverter"
}
]
} |
Compatibility, Deprecation, and Migration Plan
...