Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Monitor and alert when sink processing is happening after accepted latency (e.g. > 5secs)
    • Current workaround:
      • Inserting records timestamp into the target system to handle the calculations there.
  • Observe processing lifecycle and monitor on spikes caused by convert and transforms and reduce the time to remediation
    • Current workaround:
      • Infer connector's latency by capturing the timestamp of the source system and comparing it with the Kafka record timestamp; which is not trivial.
      • With the current poll/put and the produce/fetch latencies are not enough to infer the time taken in between.
  • Performance testing transforms in non-production environments to measure how latency increases related to throughput.

Public Interfaces

The following metrics would be added at the Task Level:

kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"

...

sink-record-batch-latency

INFO

...

  • sink-record-batch-latency-max-ms

...

sink-record-latency

DEBUG

...

  • sink-record-latency-max-ms

...

The maximum latency of a record, measured by comparing the record timestamp with the system time (i.e. wallclock) when it has been received by the Sink task right after consumer poll and before conversions.

...

  • sink-record-latency-avg-ms

...

convert-sink-record-time

INFO

...

  • convert-sink-record-time-avg-ms

...

  • convert-sink-record-time-max-ms

...

convert-sink-record-key-time

DEBUG

...

  • convert-sink-record-key-time-avg-ms

...

  • convert-sink-record-key-time-max-ms

...

convert-sink-record-value-time

DEBUG

...

  • convert-sink-record-value-time-avg-ms

...

  • convert-sink-record-value-time-max-ms

...

convert-sink-record-headers-time

DEBUG

...

  • convert-sink-record-headers-time-avg-ms

...

  • convert-sink-record-headers-time-max-ms

...

transform-chain-sink-record-time

INFO

...

  • transform-chain-sink-record-time-avg-ms

...

  • transform-chain-sink-record-time-max-ms

...

transform-sink-record-time (?)

DEBUG

...

  • transform-sink-record-time-avg-ms

...

  • transform-sink-record-time-max-ms

...

kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"

...

convert-source-record-time

INFO

...

  • convert-source-record-time-avg-ms

...

  • convert-source-record-time-max-ms

...

convert-source-record-key-time

DEBUG

...

  • convert-source-record-key-time-avg-ms

...

  • convert-source-record-key-time-max-ms

...

convert-source-record-value-time

DEBUG

...

  • convert-source-record-value-time-avg-ms

...

  • convert-source-record-value-time-max-ms

...

convert-source-record-headers-time

DEBUG

...

  • convert-source-record-headers-time-avg-ms

...

  • convert-source-record-headers-time-max-ms

...

transform-chain-source-record-time

INFO

...

  • transform-chain-source-record-time-avg-ms

...

  • transform-chain-source-record-time-max-ms

...

transform-source-record-time (?)

DEBUG

...

  • transform-source-record-time-avg-ms

...

  • transform-source-record-time-max-ms

...


Proposed Changes


Code Block
                 ┌───────────────────────────────────────────────────────────────────────────────────────────┐
                 │                                   Source Connector Task                                   │
                 │ ┌──────────────────┐  ┌────────────────┐        ┌─────────┐         ┌─────────────────┐   │
                 │ │ Connector        ├─►│ TransformChain ├───────►│ Convert ├────────►│     Producer    │   │
                 │ │ Implementation   │  └────────────────┘        └─────────┘         │                 │   │  ┌───────┐
┌────────────┐   │ └──────────────────┘  *[tx.chain-lat] INFO  *[convert-latency] INFO └─────────────────┘   ├─►│ Kafka │
│ Ext.System ├──►│  [poll-source-batch]   ┌────┬────┬───┬────┐   ┌────────────────────┐ [request-latency]    │  └───────┘
└────────────┘   │                        │ Tx1│ Tx2│...│TxN │   │Conv.Key|Val|Headers│                      │
                 │                        └────┴────┴───┴────┘   └────────────────────┘                      │
                 │                        *[tx.N-lat] DEBUG    *[convert-X-latency]DEBUG                     │
                 │ (=======batch=======)=(=========================per-record============================)   │
                 └───────────────────────────────────────────────────────────────────────────────────────────┘


                 ┌───────────────────────────────────────────────────────────────────────────────────────────┐
                 │                                     Sink Connector Task                                   │
                 │ ┌──────────────────┐  ┌────────────────┐        ┌─────────┐         ┌─────────────────┐   │
                 │ │ Connector        │◄─┤ TransformChain │◄───────┤ Convert │◄────────┤     Consumer    │   │
                 │ │ Implementation   │  └────────────────┘        └─────────┘         │                 │   │  ┌───────┐
┌────────────┐   │ └──────────────────┘  *[tx.chain-lat] INFO  *[convert-latency] INFO └─────────────────┘   │◄─┤ Kafka │
│ Ext.System │◄──┤  [put-batch-latency]   ┌────┬────┬───┬────┐   ┌────────────────────┐  [fetch-latency]     │  └───────┘
└────────────┘   │                        │ Tx1│ Tx2│...│TxN │   │Conv.Key|Val|Headers│                      │
                 │                        └────┴────┴───┴────┘   └────────────────────┘                      │
                 │                        *[tx.N-lat] DEBUG    *[convert-X-latency]DEBUG                     │
                 │ (=====batch=======)===(===========per-record=======================)==(=====batch=====)   │

(?) transform-sink-record-time and transform-source-record-time would require adding an additional label for transform alias or class name. Alias is lost on configuration and not included in Transformation API.

More granular metrics are recorded at DEBUG level to avoid performance impact.

Proposed Changes

Code Block
                 ┌───────────────────────────────────────────────────────────────────────────────────────────┐
                 │                                   Source Connector Task                                   │
                 │ ┌──────────────────┐  ┌────────────────┐        ┌─────────┐         ┌─────────────────┐   │
                 │ │ Connector        ├─►│ TransformChain ├───────►│ Convert ├────────►│     Producer    │   │
                 │ │ Implementation   │  └────────────────┘        └─────────┘         │                 │   │  ┌───────┐
┌────────────┐   │ └──────────────────┘  *[tx.chain-lat] INFO  *[convert-latency] INFO └─────────────────┘   ├─►│ Kafka │
│ Ext.System ├──►│  [poll-source-batch]   ┌────┬────┬───┬────┐   ┌────────────────────┐ [request-latency]    │  └───────┘
└────────────┘   │                        │ Tx1│ Tx2│...│TxN │   │Conv.Key|Val|Headers│                      │
                 │                        └────┴────┴───┴────┘   └────────────────────┘                      │
                 │                        *[tx.N-lat] DEBUG    *[convert-X-latency]DEBUG                     │
                 │ (=======batch=======)=(=========================per-record============================)   │
                 └───────────────────────────────────────────────────────────────────────────────────────────┘


                 ┌───────────────────────────────────────────────────────────────────────────────────────────┐
                 │                                     Sink Connector Task                                   │
                 │ ┌──────────────────┐  ┌────────────────┐        ┌─────────┐         ┌─────────────────┐   │
                 │ │ Connector        │◄─┤ TransformChain │◄───────┤ Convert │◄────────┤     Consumer    │   │
                 │ │ Implementation   │  └────────────────┘        └─────────┘         │                 │   │  ┌───────┐
┌────────────┐   │ └──────────────────┘  *[tx.chain-lat] INFO  *[convert-latency] INFO └─────────────────┘   │◄─┤ Kafka │
│ Ext.System │◄──┤  [put-batch-latency]   ┌────┬────┬───┬────┐   ┌────────────────────┐  [fetch-latency]     │  └───────┘
└────────────┘   │                        │ Tx1│ Tx2│...│TxN │   │Conv.Key|Val|Headers│                      │
                 │                        └────┴────┴───┴────┘   └────────────────────┘                      │
                 │                        *[tx.N-lat] DEBUG    *[convert-X-latency]DEBUG                     │
                 │ (=====batch=======)===(===========per-record=======================)==(=====batch=====)   │
                 └───────────────────────────────────────────────────────────────────────────────────────────┘

(*) New metrics

Source Connectors have:

  • task implementation polling records: gather batch of records from external system

  • transform: apply transformation chain individually
  • convert: convert records to ProducerRecord individually
  • send records: send records to Kafka topics individually

Polling records stage already has a latency metric: poll-batch-time .

transform-chain-source-record-time and convert-source-record-time metric will measure the transformations applied and conversion from SourceRecord  into ProducerRecord .

Send records stage can be monitored with Producer sender metrics, e.g. request-latency-avg/max

Sink Connectors have 3 stages:

  • consumer polling record: gather batch of consumer records

  • convert: convert records individually to generic SinkRecord ,

  • transform: apply transformation chain
  • process: put record batches into a external system.

Process stage already has a latency metric: put-batch-time 

To measure sink-record-latency , it's proposed to measure the different between record timestamp and current system time (wall-clock) just before the convert stage as it is when records are iterated already.

Convert latency convert-sink-record-time and transform-chain-sink-record-time measures the convert and transformation per-record.

Polling can be monitored with Consumer fetch metrics, e.g. fetch-latency-avg/max 

Compatibility, Deprecation, and Migration Plan

...

└───────────────────────────────────────────────────────────────────────────────────────────┘

(*) New metrics

Source Connectors have:

  • task implementation polling records: gather batch of records from external system

  • transform: apply transformation chain individually
  • convert: convert records to ProducerRecord individually
  • send records: send records to Kafka topics individually

Polling records stage already has a latency metric: poll-batch-time .

transform-chain-source-record-time and convert-source-record-time metric will measure the transformations applied and conversion from SourceRecord  into ProducerRecord .

Send records stage can be monitored with Producer sender metrics, e.g. request-latency-avg/max


Sink Connectors have 3 stages:

  • consumer polling record: gather batch of consumer records

  • convert: convert records individually to generic SinkRecord ,

  • transform: apply transformation chain
  • process: put record batches into a external system.

Process stage already has a latency metric: put-batch-time 

To measure sink-record-latency , it's proposed to measure the different between record timestamp and current system time (wall-clock) just before the convert stage as it is when records are iterated already.

Convert latency convert-sink-record-time and transform-chain-sink-record-time measures the convert and transformation per-record.

Polling can be monitored with Consumer fetch metrics, e.g. fetch-latency-avg/max 

Public Interfaces

The following metrics would be added at the Task Level:

kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"

Sensor /  Recording LevelAttribute nameDescription

sink-record-batch-latency

INFO

  • sink-record-batch-latency-max-ms
The maximum latency of a record batch, measured by comparing the oldest record timestamp in a batch with the system time (i.e. wallclock) when it has been received by the Sink task right after consumer poll and before conversions.

sink-record-latency

DEBUG

  • sink-record-latency-max-ms

The maximum latency of a record, measured by comparing the record timestamp with the system time (i.e. wallclock) when it has been received by the Sink task right after consumer poll and before conversions.

  • sink-record-latency-avg-ms
The average latency of a record, measured by comparing the record timestamp with the system time (i.e. wallclock) when it has been received by the Sink task right after consumer poll and before conversions.

convert-sink-record-time

INFO

  • convert-sink-record-time-avg-ms
The average time taken by this task to convert sink records, including key, value, and headers conversion.
  • convert-sink-record-time-max-ms
The maximum time taken by this task to convert sink records, including key, value, and headers conversion.

convert-sink-record-key-time

DEBUG

  • convert-sink-record-key-time-avg-ms
The average time taken by this task to convert sink record keys.
  • convert-sink-record-key-time-max-ms
The maximum time taken by this task to convert sink record keys.

convert-sink-record-value-time

DEBUG

  • convert-sink-record-value-time-avg-ms
The average time taken by this task to convert sink record values.
  • convert-sink-record-value-time-max-ms
The maximum time taken by this task to convert sink record values.

convert-sink-record-headers-time

DEBUG

  • convert-sink-record-headers-time-avg-ms
The average time taken by this task to convert sink record headers.
  • convert-sink-record-headers-time-max-ms
The maximum time taken by this task to convert sink record headers.

transform-chain-sink-record-time

INFO

  • transform-chain-sink-record-time-avg-ms
The average time taken by this task to apply all the transforms included in this task.
  • transform-chain-sink-record-time-max-ms
The maximum time taken by this task to apply all the transforms included in this task.


kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"

Sensor /  Recording LevelAttribute nameDescription

convert-source-record-time

INFO

  • convert-source-record-time-avg-ms
The average time taken by this task to convert source records, including key, value, and headers conversion.
  • convert-source-record-time-max-ms
The maximum time taken by this task to convert source records, including key, value, and headers conversion.

convert-source-record-key-time

DEBUG

  • convert-source-record-key-time-avg-ms
The average time taken by this task to convert source record keys.
  • convert-source-record-key-time-max-ms
The maximum time taken by this task to convert source record keys.

convert-source-record-value-time

DEBUG

  • convert-source-record-value-time-avg-ms
The average time taken by this task to convert source record values.
  • convert-source-record-value-time-max-ms
The maximum time taken by this task to convert source record values.

convert-source-record-headers-time

DEBUG

  • convert-source-record-headers-time-avg-ms
The average time taken by this task to convert source record headers.
  • convert-source-record-headers-time-max-ms
The maximum time taken by this task to convert source record headers.

transform-chain-source-record-time

INFO

  • transform-chain-source-record-time-avg-ms
The average time taken by this task to apply all the transforms included in this task.
  • transform-chain-source-record-time-max-ms
The maximum time taken by this task to apply all the transforms included in this task.


kafka.connect:type=sink-task-transform-metrics,connector="{connector}",alias="{transform_alias}",task="{task}"

Sensor /  Recording LevelAttribute nameDescription

transform-sink-record-time (?)

DEBUG

  • transform-sink-record-time-avg-ms
The average time taken by this task to apply specific transform included in this task.
  • transform-sink-record-time-max-ms
The maximum time taken by this task to apply specific transform included in this task.


kafka.connect:type=source-task-transform-metrics,connector="{connector}",alias="{transform_alias}",task="{task}"

Sensor /  Recording LevelAttribute nameDescription

transform-source-record-time

DEBUG

  • transform-source-record-time-avg-ms
The average time taken by this task to apply specific transform included in this task.
  • transform-source-record-time-max-ms
The maximum time taken by this task to apply specific transform included in this task.


More granular metrics are recorded at DEBUG level to avoid performance impact.


TransformationChain  and ConnectorConfig  will change their following APIs to support keeping transform alias to record metrics:

ConnectorConfig:

Code Block
-    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+    public <R extends ConnectRecord<R>> LinkedHashMap<String, Transformation<R>> transformations() {

TransformChain:

Code Block
-    public TransformationChain(List<Transformation<R>> transformations, RetryWithToleranceOperator retryWithToleranceOperator) {
+    public TransformationChain(LinkedHashMap<String, Transformation<R>> transformations, RetryWithToleranceOperator retryWithToleranceOperator) {


Compatibility, Deprecation, and Migration Plan


Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...