...
- In the source connector:
- After polling, there are transformations and conversions that happen before the records are sent to Kafka.
- In the sink connector:
- Record latency:
wall-clock time - record timestamp
to evaluate how late records are processed. Convert and transform time before sending records to a external system
- Record latency:
With these enhanced metrics available, operators/developers could:
- Monitor and alert when sink processing is happening after accepted latency (e.g. > 5secs)
- Current workaround:
- Inserting records timestamp into the target system to handle the calculations there.
- Current workaround:
- Observe processing lifecycle and monitor on spikes caused by convert and transforms and reduce the time to remediation
- Current workaround:
- Infer connector's latency by capturing the timestamp of the source system and comparing it with the Kafka record timestamp; which is not trivial.
- With the current poll/put and the produce/fetch latencies are not enough to infer the time taken in between.
- Current workaround:
- Performance testing transforms in non-production environments to measure how latency increases related to throughput.
Public Interfaces
The following metrics would be added at the Task Level:
kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
Sensor / Recording Level | Attribute name | Description |
---|---|---|
sink-record-batch-latency INFO |
| The maximum latency of a record batch, measured by comparing the oldest record timestamp in a batch with the system time (i.e. wallclock) when it has been received by the Sink task right after consumer poll and before conversions. |
sink-record-latency DEBUG |
| The maximum latency of a record, measured by comparing the record timestamp with the system time (i.e. wallclock) when it has been received by the Sink task right after consumer poll and before conversions. |
| The average latency of a record, measured by comparing the record timestamp with the system time (i.e. wallclock) when it has been received by the Sink task right after consumer poll and before conversions. | |
convert-sink-record-time INFO |
| The average time taken by this task to convert sink records, including key, value, and headers conversion. |
| The maximum time taken by this task to convert sink records, including key, value, and headers conversion. | |
convert-sink-record-key-time DEBUG |
| The average time taken by this task to convert sink record keys. |
| The maximum time taken by this task to convert sink record keys. | |
convert-sink-record-value-time DEBUG |
| The average time taken by this task to convert sink record values. |
| The maximum time taken by this task to convert sink record values. | |
convert-sink-record-headers-time DEBUG |
| The average time taken by this task to convert sink record headers. |
| The maximum time taken by this task to convert sink record headers. | |
transform-chain-sink-record-time INFO |
| The average time taken by this task to apply all the transforms included in this task. |
| The maximum time taken by this task to apply all the transforms included in this task. | |
transform-sink-record-time (?) DEBUG |
| The average time taken by this task to apply specific transform included in this task. |
| The maximum time taken by this task to apply specific transform included in this task. |
kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
Sensor / Recording Level | Attribute name | Description |
---|---|---|
convert-source-record-time INFO |
| The average time taken by this task to convert source records, including key, value, and headers conversion. |
| The maximum time taken by this task to convert source records, including key, value, and headers conversion. | |
convert-source-record-key-time DEBUG |
| The average time taken by this task to convert source record keys. |
| The maximum time taken by this task to convert source record keys. | |
convert-source-record-value-time DEBUG |
| The average time taken by this task to convert source record values. |
| The maximum time taken by this task to convert source record values. | |
convert-source-record-headers-time DEBUG |
| The average time taken by this task to convert source record headers. |
| The maximum time taken by this task to convert source record headers. | |
transform-chain-source-record-time INFO |
| The average time taken by this task to apply all the transforms included in this task. |
| The maximum time taken by this task to apply all the transforms included in this task. | |
transform-source-record-time (?) DEBUG |
| The average time taken by this task to apply specific transform included in this task. |
| The maximum time taken by this task to apply specific transform included in this task. |
(?) transform-sink-record-time and transform-source-record-time would require adding an additional label for transform alias or class name. Alias is lost on configuration and not included in Transformation API.
More granular metrics are recorded at DEBUG level to avoid performance impact.
Proposed Changes
Code Block |
---|
┌───────────────────────────────────────────────────────────────────────────────────────────┐
│ Source Connector Task │
│ ┌──────────────────┐ ┌────────────────┐ ┌─────────┐ ┌─────────────────┐ │
│ │ Connector ├─►│ TransformChain ├───────►│ Convert ├────────►│ Producer │ │
│ │ Implementation │ └────────────────┘ └─────────┘ │ │ │ ┌───────┐
┌────────────┐ │ └──────────────────┘ *[tx.chain-lat] INFO *[convert-latency] INFO └─────────────────┘ ├─►│ Kafka │
│ Ext.System ├──►│ [poll-source-batch] ┌────┬────┬───┬────┐ ┌────────────────────┐ [request-latency] │ └───────┘
└────────────┘ │ │ Tx1│ Tx2│...│TxN │ │Conv.Key|Val|Headers│ │
│ └────┴────┴───┴────┘ └────────────────────┘ │
│ *[tx.N-lat] DEBUG *[convert-X-latency]DEBUG │
│ (=======batch=======)=(=========================per-record============================) │
└───────────────────────────────────────────────────────────────────────────────────────────┘
┌───────────────────────────────────────────────────────────────────────────────────────────┐
│ Sink Connector Task │
│ ┌──────────────────┐ ┌────────────────┐ ┌─────────┐ ┌─────────────────┐ │
│ │ Connector │◄─┤ TransformChain │◄───────┤ Convert │◄────────┤ Consumer │ │
│ │ Implementation │ └────────────────┘ └─────────┘ │ │ │ ┌───────┐
┌────────────┐ │ └──────────────────┘ *[tx.chain-lat] INFO *[convert-latency] INFO └─────────────────┘ │◄─┤ Kafka │
│ Ext.System │◄──┤ [put-batch-latency] ┌────┬────┬───┬────┐ ┌────────────────────┐ [fetch-latency] │ └───────┘
└────────────┘ │ │ Tx1│ Tx2│...│TxN │ │Conv.Key|Val|Headers│ │
│ └────┴────┴───┴────┘ └────────────────────┘ │
│ *[tx.N-lat] DEBUG *[convert-X-latency]DEBUG │
│ (=====batch=======)===(===========per-record=======================)==(=====batch=====) │
└───────────────────────────────────────────────────────────────────────────────────────────┘
|
...
- Monitor and alert when sink processing is happening after accepted latency (e.g. > 5secs)
- Current workaround:
- Inserting records timestamp into the target system to handle the calculations there.
- Current workaround:
- Observe processing lifecycle and monitor on spikes caused by convert and transforms and reduce the time to remediation
- Current workaround:
- Infer connector's latency by capturing the timestamp of the source system and comparing it with the Kafka record timestamp; which is not trivial.
- With the current poll/put and the produce/fetch latencies are not enough to infer the time taken in between.
- Current workaround:
- Performance testing transforms in non-production environments to measure how latency increases related to throughput.
Public Interfaces
The following metrics would be added at the Task Level:
kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
...
- sink-record-batch-latency-max-ms
...
- sink-record-latency-max-ms
...
The maximum latency of a record, measuring by comparing the record timestamp with the system time when it has been received by the Sink task
...
- sink-record-latency-avg-ms
...
- sink-record-convert-transform-time-max-ms
...
- sink-record-convert-transform-time-max-ms
...
- sink-record-convert-transform-time-avg-ms
...
- sink-record-transform-time-max-ms
...
- sink-record-transform-time-avg-ms
...
kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
...
- source-record-transform-convert-time-max-ms
...
The maximum time in milliseconds taken by this task to transform and convert for a record.
...
- source-record-transform-convert-time-avg-ms
...
(*) New metrics
Source Connectors have:
task implementation polling records: gather batch of records from external system
- transform: apply transformation chain individually
- convert: convert records to
ProducerRecord
individually - send records: send records to Kafka topics individually
Polling records stage already has a latency metric: poll-batch-time
.
transform-chain-source-record-time
and convert-source-record-time
metric will measure the transformations applied and conversion from SourceRecord
into ProducerRecord
.
Send records stage can be monitored with Producer sender metrics, e.g. request-latency-avg/max
Metrics recorded at DEBUG level is because they are recorded at individual record level, compared to all other metrics recorded at batch level.
...
Sink Connectors have 3 stages:
consumer polling record: gather batch of consumer records
convert/transform: convert records individually to generic
SinkRecord
, apply transformers, and prepare batches- transform: apply transformation chain
process: put record batches into a external system.
...
To measure sink-record-latency
, it's proposed to measure the different between record timestamp and current system time (wall-clock) at the beginning of just before the convert stage as it is when records are iterated already.
Convert latency convert-sink-record-convert-transform-time
and transform-chain-sink-record-time
measures the convert and transformation per-record.
Polling can be monitored with Consumer fetch metrics, e.g. fetch-latency-avg/max
On the other hand, in the Source Connector has:
polling: gather batch of records from external system
- transform/convert: apply transformations and convert records to
ProducerRecord
individually - send records: send records to Kafka topics individually
Polling records stage already has a latency metric: poll-batch-time
.
source-record-transform-convert-time
metric will measure the transformations applied and conversion from SourceRecord
into ProducerRecord
.
Send records stage can be monitored with Producer sender metrics, e.g. request-latency-avg/max
Compatibility, Deprecation, and Migration Plan
...