Status
Current state: "Under Discussion"
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, Sink Connectors include only putBatchLatency metric to measure the time to sink a batch of records into a external system.
In other to observe connector performance and measure a complete end-to-end latency of the Sink Connector there is a need for additional measures:
- record latency:
wall-clock time - record timestamp
to evaluate how late records are processed. convert time: latency to convert + transform batch of records to know how long is taking to convert and transform records
With these metrics, it will be clearer how much latency the sink connector introduces, and where the bottleneck may be.
In the case of source connectors, convert time can be added to improve the latency monitoring and have parity with the metrics on the sink connectors.
Public Interfaces
The following metrics would be added at the Task Level:
kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
Attribute name | Description |
---|---|
| The maximum latency of a record, measuring by comparing the record timestamp with the system time when it has been received by the Sink task |
| The average latency of a record, measuring by comparing the record timestamp with the system time when it has been received by the Sink task |
| The maximum time taken by this task to convert and transform a record. |
| The average time taken by this task to convert and transform a record. |
kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
Attribute name | Description |
---|---|
| The maximum time in milliseconds taken by this task to transform and convert for a record. |
| The average time in milliseconds taken by this task to transform and convert for a record. |
Proposed Changes
Sink Connectors have 3 stages:
polling: gather batch of consumer records
convert/transform: convert records individually to generic
SinkRecord
, apply transformers, and prepare batchesprocess: put record batches into a external system.
Process stage already has a latency metric: put-batch-time
To measure sink-record-latency
, it's proposed to measure the different between record timestamp and current system time (wall-clock) at the beginning of the convert stage as it is when records are iterated already.
Convert latency sink-record-convert-transform-time
measures the convert and transformation per-record.
Polling can be monitored with Consumer fetch metrics, e.g. fetch-latency-avg/max
On the other hand, in the Source Connector has:
polling: gather batch of records from external system
- transform/convert: apply transformations and convert records to
ProducerRecord
individually - send records: send records to Kafka topics individually
Polling records stage already has a latency metric: poll-batch-time
.
source-record-transform-convert-time
metric will measure the transformations applied and conversion from SourceRecord
into ProducerRecord
.
Send records stage can be monitored with Producer sender metrics, e.g. request-latency-avg/max
Compatibility, Deprecation, and Migration Plan
N/A
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.