...
Connect should allow users to configure how failures should be handled during all phases of processing records. Some failures such as the lack of availability of some external components may be resolved by simply retrying, while other errors should be recorded and the problem records simply skipped. Where possible, Connect should be able to record the error and optionally include the problematic records and configuration states of the connector, transform and converter. Since no single solution works for everyone, all of this error handling behavior should be configurable.
This proposal aims to change the Connect framework to allow it to automatically deal with errors while processing records in a Connector. By default, Connect will fail immediately when an error occurs, which is the previous Connect behavior. Therefore, all new behaviors must be explicitly enabled.
...
Retry the failed operation a configurable number of times, with exponential backoff between each retry, based on a fixed starting delay value. The number of retries and backoff can be configured using the following new properties:
...
Tolerate up to a configurable number of failures errors in a task. A failed operation is declared to be an error only if Connect has exhausted all retry options. If the task fails to successfully perform an operation on a record within tolerance limit, the record is skipped. Once the tolerance limit (overall or rate) is reached, the task will fail. Tolerance limits can be configured using the following new properties:
...
Config Option | Description | Default Value | Domain |
---|---|---|---|
errors.log.enable | Log the error context (the base information, configs of various stages and the failed message) along with the other application logs. | false | Boolean |
errors.log.include.configs | Include the (worker, connector) configs in the log. | false | Boolean |
errors.log.include.messages | Include the Connect Record which failed to process in the log. | false | Boolean |
...
Message Format for Error Context
The logged error context object contains Connect will use the JsonConverter to serialize the error context and log them in JSON format. The JSON object will contain the following fields:
Type | Name | Description |
---|---|---|
Record | record | The record which caused the exception (if available). |
Stage[] | stages | An ordered array of stages. |
int | index | A pointer to a stage in the array where the failure occurred. |
string | exception | The exception, and stacktrace for the failure (if available). |
int32 | attempt | Number of attempts made to correct the failure. |
string | task_id | The id of the task which where the failure occurred. |
int64 | time_of_error | The epoch time of failure. |
...
Type | Name | Description |
---|---|---|
string | schema | the data type of the value. For example, "BYTES", or "STRUCT". |
string | value | a string encoding of the value. If the type is "BYTES", then this value must be base64 decoded to obtain the original byte[]. If not, it is a JSON representation of the Connect recordobject created using the JsonConverter. |
Finally, the Stage object describes the different operations performed in a Connector pipeline:
...