Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

Currently, Sink and Source Connectors include only putBatchLatency metric to measure latency metrics covering only the time expend interacting with the external systems — put-batch-latency  metric measures the time to sink a batch of records into a , and poll-batch-time metric measures the time to poll a batch of records from an external system.

At the moment it's difficult to understand how much latency has the connector introduces to the process, and how long after being written a record is processed.

In order In other to observe connector's performance and measure a its complete end-to-end latency of the Sink Connector there is a need for additional measures:from sources to sinks there are additional measurements:

  • In the source connector:
    • After polling, there are transformations and conversions that happen before the records are sent to Kafka.
  • In the sink connector:
    • Record
    record
    • latency: wall-clock time - record timestamp  to evaluate how late records are processed.
    • Convert and transform time before sending records to a external system


Code Block
+------------+  +-------------------------------(source-connector)---------------------------+
| Ext.System |->|-(poll-batch-time)->([*]transform-convert

...

With these metrics, it will be clearer how much latency the sink connector introduces, and where the bottleneck may be.

...

-time)->(producer-request-latency) |-> Kafka
+------------+  +----------------------------------------------------------------------------+

+------------+  +-------------------------------(sink-connector)----------------------------------------------+
| Ext.System |<-|-(put-batch-latency)<-([*]convert-transform-time)<-([*]sink-record-latency)<-(fetch-latency)-|<- Kafka
+------------+  +---------------------------------------------------------------------------------------------+


With these enhanced metrics available, operators/developers could:

  • Monitor and alert when sink processing is happening after accepted latency (e.g. > 5secs)
    • Current workaround:
      • Inserting records timestamp into the target system to handle the calculations there.
  • Observe processing lifecycle and monitor on spikes caused by convert and transforms and reduce the time to remediation
    • Current workaround:
      • Infer connector's latency by capturing the timestamp of the source system and comparing it with the Kafka record timestamp; which is not trivial.
      • With the current poll/put and the produce/fetch latencies are not enough to infer the time taken in between.
  • Performance testing transforms in non-production environments to measure how latency increases related to throughput.

Public Interfaces

The following metrics would be added at the Task Level:

...

Attribute nameRecording levelDescription
  • sink-record-batch-latency-max-ms
INFOThe maximum latency of a record batch, measuring by comparing the oldest record timestamp in a batch with the system time when it has been received by the Sink task
  • sink-record-latency-max-ms
DEBUG

The maximum latency of a record, measuring by comparing the record timestamp with the system time when it has been received by the Sink task

  • sink-record-latency-avg-ms
DEBUGThe average latency of a record, measuring by comparing the record timestamp with the system time when it has been received by the Sink task
  • sink-record-convert-transform-time-max-ms
DEBUGINFOThe maximum time taken by this task to convert and transform a record batch.
  • sink-record-convert-transform-time-max-ms
DEBUGThe maximum time taken by this task to convert a record.
  • sink-record-convert-transform-time-avg-ms
DEBUGThe average time taken by this task to convert a record.
  • sink-record-transform-time-max-ms
DEBUGThe maximum time taken by this task to transform a record.
  • sink-record-transform-time-avg-ms
DEBUGThe average time taken by this task to and transform a record.


kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"

...