...
Motivation
Currently, Sink and Source Connectors include only putBatchLatency metric to measure latency metrics covering only the time expend interacting with the external systems — put-batch-latency
metric measures the time to sink a batch of records into a , and poll-batch-time
metric measures the time to poll a batch of records from an external system.
At the moment it's difficult to understand how much latency has the connector introduces to the process, and how long after being written a record is processed.
In order In other to observe connector's performance and measure a its complete end-to-end latency of the Sink Connector there is a need for additional measures:from sources to sinks there are additional measurements:
- In the source connector:
- After polling, there are transformations and conversions that happen before the records are sent to Kafka.
- In the sink connector:
- Record
- latency:
wall-clock time - record timestamp
to evaluate how late records are processed. Convert and transform time before sending records to a external system
Code Block |
---|
+------------+ +-------------------------------(source-connector)---------------------------+ | Ext.System |->|-(poll-batch-time)->([*]transform-convert |
...
With these metrics, it will be clearer how much latency the sink connector introduces, and where the bottleneck may be.
...
-time)->(producer-request-latency) |-> Kafka
+------------+ +----------------------------------------------------------------------------+
+------------+ +-------------------------------(sink-connector)----------------------------------------------+
| Ext.System |<-|-(put-batch-latency)<-([*]convert-transform-time)<-([*]sink-record-latency)<-(fetch-latency)-|<- Kafka
+------------+ +---------------------------------------------------------------------------------------------+ |
With these enhanced metrics available, operators/developers could:
- Monitor and alert when sink processing is happening after accepted latency (e.g. > 5secs)
- Current workaround:
- Inserting records timestamp into the target system to handle the calculations there.
- Current workaround:
- Observe processing lifecycle and monitor on spikes caused by convert and transforms and reduce the time to remediation
- Current workaround:
- Infer connector's latency by capturing the timestamp of the source system and comparing it with the Kafka record timestamp; which is not trivial.
- With the current poll/put and the produce/fetch latencies are not enough to infer the time taken in between.
- Current workaround:
- Performance testing transforms in non-production environments to measure how latency increases related to throughput.
Public Interfaces
The following metrics would be added at the Task Level:
...
Attribute name | Recording level | Description |
---|---|---|
| INFO | The maximum latency of a record batch, measuring by comparing the oldest record timestamp in a batch with the system time when it has been received by the Sink task |
| DEBUG | The maximum latency of a record, measuring by comparing the record timestamp with the system time when it has been received by the Sink task |
| DEBUG | The average latency of a record, measuring by comparing the record timestamp with the system time when it has been received by the Sink task |
| DEBUGINFO | The maximum time taken by this task to convert and transform a record batch. |
| DEBUG | The maximum time taken by this task to convert a record. |
| DEBUG | The average time taken by this task to convert a record. |
| DEBUG | The maximum time taken by this task to transform a record. |
| DEBUG | The average time taken by this task to and transform a record. |
kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
...