You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Draft implementation: https://github.com/apache/kafka/pull/12647

Motivation

Currently, Sink and Source Connectors include latency metrics covering only the time expend interacting with the external systems — put-batch-latency  metric measures the time to sink a batch of records, and poll-batch-time metric measures the time to poll a batch of records from an external system.

At the moment it's difficult to understand how much latency has the connector introduces to the process, and how long after being written a record is processed.

In order to observe connector's performance and measure its complete end-to-end latency from sources to sinks there are additional measurements:

  • In the source connector:
    • After polling, there are transformations and conversions that happen before the records are sent to Kafka.
  • In the sink connector:
    • Record latency: wall-clock time - record timestamp  to evaluate how late records are processed.
    • Convert and transform time before sending records to a external system

                 ┌───────────────────────────────────────────────────────────────────────────────────────────┐
                 │                                   Source Connector Task                                   │
                 │ ┌──────────────────┐  ┌────────────────┐        ┌─────────┐         ┌─────────────────┐   │
                 │ │ Connector        ├─►│ TransformChain ├───────►│ Convert ├────────►│     Producer    │   │
                 │ │ Implementation   │  └────────────────┘        └─────────┘         │                 │   │  ┌───────┐
┌────────────┐   │ └──────────────────┘  *[tx.chain-lat] INFO  *[convert-latency] INFO └─────────────────┘   ├─►│ Kafka │
│ Ext.System ├──►│  [poll-source-batch]   ┌────┬────┬───┬────┐   ┌────────────────────┐ [request-latency]    │  └───────┘
└────────────┘   │                        │ Tx1│ Tx2│...│TxN │   │Conv.Key|Val|Headers│                      │
                 │                        └────┴────┴───┴────┘   └────────────────────┘                      │
                 │                        *[tx.N-lat] DEBUG    *[convert-X-latency]DEBUG                     │
                 │ (=======batch=======)=(=========================per-record============================)   │
                 └───────────────────────────────────────────────────────────────────────────────────────────┘


                 ┌───────────────────────────────────────────────────────────────────────────────────────────┐
                 │                                     Sink Connector Task                                   │
                 │ ┌──────────────────┐  ┌────────────────┐        ┌─────────┐         ┌─────────────────┐   │
                 │ │ Connector        │◄─┤ TransformChain │◄───────┤ Convert │◄────────┤     Consumer    │   │
                 │ │ Implementation   │  └────────────────┘        └─────────┘         │                 │   │  ┌───────┐
┌────────────┐   │ └──────────────────┘  *[tx.chain-lat] INFO  *[convert-latency] INFO └─────────────────┘   │◄─┤ Kafka │
│ Ext.System │◄──┤  [put-batch-latency]   ┌────┬────┬───┬────┐   ┌────────────────────┐  [fetch-latency]     │  └───────┘
└────────────┘   │                        │ Tx1│ Tx2│...│TxN │   │Conv.Key|Val|Headers│                      │
                 │                        └────┴────┴───┴────┘   └────────────────────┘                      │
                 │                        *[tx.N-lat] DEBUG    *[convert-X-latency]DEBUG                     │
                 │ (=====batch=======)===(===========per-record=======================)==(=====batch=====)   │
                 └───────────────────────────────────────────────────────────────────────────────────────────┘


With these enhanced metrics available, operators/developers could:

  • Monitor and alert when sink processing is happening after accepted latency (e.g. > 5secs)
    • Current workaround:
      • Inserting records timestamp into the target system to handle the calculations there.
  • Observe processing lifecycle and monitor on spikes caused by convert and transforms and reduce the time to remediation
    • Current workaround:
      • Infer connector's latency by capturing the timestamp of the source system and comparing it with the Kafka record timestamp; which is not trivial.
      • With the current poll/put and the produce/fetch latencies are not enough to infer the time taken in between.
  • Performance testing transforms in non-production environments to measure how latency increases related to throughput.

Public Interfaces

The following metrics would be added at the Task Level:

kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"

Attribute nameRecording levelDescription
  • sink-record-batch-latency-max-ms
INFOThe maximum latency of a record batch, measuring by comparing the oldest record timestamp in a batch with the system time when it has been received by the Sink task
  • sink-record-latency-max-ms
DEBUG

The maximum latency of a record, measuring by comparing the record timestamp with the system time when it has been received by the Sink task

  • sink-record-latency-avg-ms
DEBUGThe average latency of a record, measuring by comparing the record timestamp with the system time when it has been received by the Sink task
  • sink-record-convert-transform-time-max-ms
INFOThe maximum time taken by this task to convert and transform a record batch.
  • sink-record-convert-transform-time-max-ms
DEBUGThe maximum time taken by this task to convert a record.
  • sink-record-convert-transform-time-avg-ms
DEBUGThe average time taken by this task to convert a record.
  • sink-record-transform-time-max-ms
DEBUGThe maximum time taken by this task to transform a record.
  • sink-record-transform-time-avg-ms
DEBUGThe average time taken by this task to transform a record.


kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"

Attribute nameRecording LevelDescription
  • source-record-transform-convert-time-max-ms
DEBUG

The maximum time in milliseconds taken by this task to transform and convert for a record.

  • source-record-transform-convert-time-avg-ms
DEBUGThe average time in milliseconds taken by this task to transform and convert for a record.


Metrics recorded at DEBUG level is because they are recorded at individual record level, compared to all other metrics recorded at batch level.

Proposed Changes

Sink Connectors have 3 stages:

  • polling: gather batch of consumer records

  • convert/transform: convert records individually to generic SinkRecord , apply transformers, and prepare batches

  • process: put record batches into a external system.

Process stage already has a latency metric: put-batch-time 

To measure sink-record-latency , it's proposed to measure the different between record timestamp and current system time (wall-clock) at the beginning of the convert stage as it is when records are iterated already.

Convert latency sink-record-convert-transform-time  measures the convert and transformation per-record.

Polling can be monitored with Consumer fetch metrics, e.g. fetch-latency-avg/max 


On the other hand, in the Source Connector has:

  • polling: gather batch of records from external system

  • transform/convert: apply transformations and convert records to ProducerRecord individually
  • send records: send records to Kafka topics individually

Polling records stage already has a latency metric: poll-batch-time .

source-record-transform-convert-time metric will measure the transformations applied and conversion from SourceRecord  into ProducerRecord .

Send records stage can be monitored with Producer sender metrics, e.g. request-latency-avg/max 

Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.



  • No labels