Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

An important part of deploying Kafka Connect is monitoring the health of the workers in a cluster and the connectors and tasks that have been deployed to the cluster. The Kafka Connect framework only has a few metrics capturing the number of connectors and tasks for each worker, so we propose to add metrics to monitor more information about the connectors, tasks, and workers. This proposal expressly avoids changes to the Connect API, and therefore does not address how connector implementations can define their own connector-specific metrics.

All metrics reported by each worker are scoped by the activities within that worker. Kafka Connect does not have any existing mechanism to aggregate the metrics reported by each worker, and therefore any such aggregation is out of scope for this KIP.

Public Interfaces

All of the following will be added via Kafka's metrics library like most of the metrics in the Kafka brokers and other components.

Source Task Metrics

The scope of all metrics are limited to the worker where the metrics are being reported, and all metric names include the name of the worker in the MBean attribute.

Source Task Metrics

The number of records produced per second by connector record total
Metric NameDescriptionMBean attribute
source record produce rateThe number of records per second produced (before transformation) by
Metric NameDescriptionMBean attribute
source record rate this task belonging to the named source connector in this workerkafka.connect:type=source-task-metrics,name=source-record-produce-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
source record produce totalThe total number of records produced (before transformation) by this task belonging to the named source connector in this workerkafka.connect:type=source-task-metrics,name=source-record-produce-total,worker=([-.\w]+)l,connector=([-.\w]+),task=([\d]+)
poll time percentagesource record write rateThe average percentage of time spent polling number of records per second output from the transformations and written to Kafka for this task belonging to the named source connector in this worker. This is after transformation and excludes any records filtered out by the transformations.kafka.connect:type=source-task-metrics,name=poll-time-percentagesource-record-produce-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)transform time percentage
source record write totalThe average percentage of time spent transforming source records for total number of records output from the transformations and written to Kafka by this task belonging to the named source connector in this worker. This is after transformation and excludes any records filtered out by the transformations.kafka.connect:type=source-task-metrics,name=transform-time-percentagesource-record-produce-total,worker=([-.\w]+)l,connector=([-.\w]+),task=([\d]+)
write poll time percentageThe average percentage of time spent converting and writing source records for polling this task belonging to the named source connectorin this workerkafka.connect:type=source-task-metrics,name=writepoll-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
pause transform time percentageThe average percentage of time spent transforming source records for this task belonging to the named source connector were paused in this workerkafka.connect:type=source-task-metrics,name=pausetransform-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)

...

The average percentage of time spent polling all tasks belonging to the named source connector
write time percentageThe average percentage of time spent converting and writing source records for this task
Metric NameDescriptionMBean attribute
source record rateThe number of records produced per second by all tasks belonging to the named source connector in this workerkafka.connect:type=source-connectortask-metrics,name=source-record-ratewrite-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)source record total
pause time percentageThe total number of records produced by all tasks average percentage of time this task belonging to the named source connectorin this worker were pausedkafka.connect:type=source-connectortask-metrics,name=sourcepause-recordtime-totalpercentage,connectorworker=([-.\w]+)poll time percentage,connector=([-.\w]+),task=([\d]+)


Source Connector Metrics

polltime-percentageconnectorThe average percentage of time spent transforming source records for all tasks belonging to the named source connectortransform-time-percentage
Metric NameDescriptionMBean attribute
connector classThe name of the connector classkafka.connect:type=source-connector-metrics,name=connector-class,worker=([-.\w]+)transform time percentage,connector=([-.\w]+)
connector versionThe version of the connector class, as reported by the connector in this workerkafka.connect:type=source-connector-metrics,name=connector-version,worker=([-.\w]+),connector=([-.\w]+)
write time percentagesource record rateThe average percentage of time spent converting and writing source records for number of records produced per second by all tasks belonging to the named source connectorin this workerkafka.connect:type=source-connector-metrics,name=write-time-percentagesource-record-rate,worker=([-.\w]+),connector=([-.\w]+)pause time percentage
source record totalThe average percentage of time total number of records produced by all tasks belonging to the named source connector were pausedin this workerkafka.connect:type=source-connector-metrics,name=pausesource-timerecord-percentagetotal,connectorworker=([-.\w]+),connector=([-.\w]+)
poll time percentageThe average percentage of time this worker spent polling all tasks belonging to the named source connector statusThe current status of the connector, one of: running, paused, stopped  kafka.connect:type=source-connector-metrics,name=status=poll-time-percentage,worker=([-.\w]+),connector=([-.\w]+)

 

Sink Task Metrics

 

transform time percentageThe average percentage of time this worker spent transforming source records for all tasks
Metric NameDescriptionMBean attribute
sink record rateThe number of records produced per second by this task belonging to the named source connectorkafka.connect:type=sinksource-taskconnector-metrics,name=sinktransform-recordtime-ratepercentage,connectorworker=([-.\w]+),taskconnector=([-.\dw]+)sink record total
write time percentageThe total number of records produced by this task average percentage of time this worker spent converting and writing source records for all tasks belonging to the named source connectorkafka.connect:type=sinksource-taskconnector-metrics,name=sinkwrite-recordtime-totalpercentage,connectorworker=([-.\w]+),taskconnector=([-.\dw]+)
read pause time percentageThe average percentage of time spent polling this task all tasks in this worker belonging to the named source connector were pausedkafka.connect:type=sinksource-taskconnector-metrics,name=readpause-time-percentage,connectorworker=([-.\w]+),taskconnector=([-.\dw]+)transform time percentageThe average percentage of time spent transforming sink records for this task belonging to the named sink connector
 statusThe current status of the connector in this worker, one of: running, paused, stopped  kafka.connect:type=sinksource-taskconnector-metrics,name=transform-time-percentage,connectorstatus,worker=([-.\w]+),taskconnector=([-.\dw]+)put time percentage

 

Sink Task Metrics

 

The average percentage of time this spent putting/processing sink recordsput-time-percentageThe average percentage of time spent flushing sink recordsflush-time-percentage
Metric NameDescriptionMBean attribute
sink record read rateThe number of records per second read from Kafka for this task belonging to the named sink connector in this worker. This includes all records passed to the transformations.kafka.connect:type=sink-task-metrics,name=sink-record-read-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)flush time percentage
sink record read totalThe total number of records read from Kafka for this task belonging to the named sink connector in this worker. This includes all records passed to the transformations.kafka.connect:type=sink-task-metrics,name=sink-record-read-total,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
pause time percentagesink record process rateThe average percentage of time this task belonging number of records per second output from the transformations and sent to this task belonging to the named sink connector were pausedin this worker. This is after transformation and excludes any records filtered out by the transformations.kafka.connect:type=sink-task-metrics,name=pause-time-percentagesink-record-process-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)

...

sink record process totalThe total number of records output from the transformations and sent to this task

 

Metric NameDescriptionMBean attribute
sink record rateThe number of records produced per second by all tasks belonging to the named source connectorsink connector in this worker. This is after transformation and excludes any records filtered out by the transformations.kafka.connect:type=sink-connectortask-metrics,name=sink-record-rate-process-total,worker=([-.\w]+),connector=([-.\w]+)sink record totalThe total number of records produced by all tasks belonging to the named source connectorkafka.connect:type=sink-connector-metrics,name=sink-record-total,connector,task=([-.\wd]+)
read time percentageThe average percentage of time spent polling all tasks this task belonging to the named source connectorin this workerkafka.connect:type=sink-connectortask-metrics,name=read-time-percentage,connectorworker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
transform time percentageThe transform time percentageThe average percentage of time spent transforming sink records for all tasks this task belonging to the named sink connectorin this workerkafka.connect:type=sink-connectortask-metrics,name=transform-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
put time percentageThe The average percentage of time all tasks this task belonging to the named sink connector spent in this worker spent putting/processing sink recordskafka.connect:type=sink-connectortask-metrics,name=put-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
flush time percentageThe The average percentage of time all tasks this task belonging to the named sink connector in this worker spent flushing sink recordskafka.connect:type=sink-connectortask-metrics,name=flush-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
pause time percentageThe average percentage of time all tasks this task belonging to the named sink connector in this worker were pausedkafka.connect:type=sink-connectortask-metrics,name=pause-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)


Sink Connector Metrics

 

Metric NameDescriptionMBean attribute
connector classThe name of the connector classkafkastatusThe current status of the connector, one of: running, paused, stopped  kafka.connect:type=sink-connector-metrics,name=status,connectorconnector-class,worker=([-.\w]+),connector=([-.\w]+)
partition countconnector versionThe number of topic partitions assigned to all tasks running version of the connector class, as reported by the connector in this worker for the named sink connectorkafka.connect:type=sink-connector-metrics,name=partition-count=connector-version,worker=([-.\w]+),connector=([-.\w]+)

...

sink record rateThe number of records produced per second in this worker by all tasks belonging to the named source connector

 

The number of tasks run in this worker (existing metric)connectcoordinatorassignedtasksassigned connectors connectors run (existing metric)connectcoordinatorassignedconnectorstask count number of tasks run in this workerconnectworkerassignedtasksconnector count number of connectors run in this workerconnectworkerassignedconnectorskafka.connect:type=connect-worker-metrics,name=leader-name,worker
Metric NameDescriptionMBean attribute

assigned tasks

kafka.connect:type=sink-connector-metrics,name=sink-record-rate,worker=([-.\w]+),connector=([-.\w]+)
sink record totalThe total number of records produced in this worker by all tasks belonging to the named source connectorkafka.connect:type=sink-connector-metrics,name=sink-record-total,worker=([-.\w]+),connector=([-.\w]+)
read time percentageThe average percentage of time this worker spent polling all tasks belonging to the named source connectorkafka.connect:type=sink-connector-metrics,name=read-time-percentage,worker=([-.\w]+),connector=([-.\w]+)
transform time percentageThe average percentage of time this worker spent transforming sink records for all tasks belonging to the named sink connectorkafka.connect:type=sink-connector-metrics,name=transform-time-percentage,worker=([-.\w]+),connector=([-.\w]+)
sink record rateput time percentageThe number of sink records consumed per second by all sink connectorsaverage percentage of time all tasks belonging to the named sink connector in this worker spent putting/processing sink recordskafka.connect:type=connectsink-workerconnector-metrics,name=sinkput-recordtime-ratepercentage,worker=([-.\w]+),connector=([-.\w]+)sink record total
flush time percentageThe total number of sink records consumed by all sink connectoraverage percentage of time all tasks belonging to the named sink connectorin this worker spent flushing sink recordskafka.connect:type=connectsink-workerconnector-metrics,name=sinkflush-recordtime-totalpercentage,worker=([-.\w]+)source record rateThe number of source records produced per second by all source connectorskafka.connect:type=connect-worker-metrics,name=source-record-rate,worker,connector=([-.\w]+)source record total
pause time percentageThe total number of source records produced by all source connectoraverage percentage of time all tasks belonging to the named sink connector in this worker were pausedkafka.connect:type=connectsink-workerconnector-metrics,name=sourcepause-recordtime-totalpercentage,worker=([-.\w]+)leader nameThe name of the group leader,connector=([-.\w]+)
statestatusThe state current status of the connector in this worker, one of: rebalancingrunning, paused, runningstopped kafka kafka.connect:type=connectsink-workerconnector-metrics,name=statestatus,worker=([-.\w]+),connector=([-.\w]+)
offset commit success totalpartition countThe total number of successful offset commitstopic partitions assigned to all tasks running in this worker for the named sink connectorkafka.connect:type=connectsink-workerconnector-metrics,name=offset-commit-success-total,workerpartition-count,worker=([-.\w]+),connector=([-.\w]+)


Worker Metrics

 

The maximum time spent to commit offsetsoffsetcommitmax-timeThe 99th percentile time spent to commit offsets during the last window (defaults to an hour)offsetcommit99p-timeThe 95th percentile time spent to commit offsets during the last window (defaults to an hour)offsetcommit95p-timeThe 90th percentile time spent to commit offsets during the last window (defaults to an hour)offsetcommit90p-time
Metric NameDescriptionMBean attribute

assigned tasks

The number of tasks run in this worker (copy of existing metric)offset commit success percentageThe average percentage of offset commits that succeededkafka.connect:type=connect-worker-metrics,name=offset-commit-success-percentageassigned-tasks,worker=([-.\w]+)offset commit failure total
assigned connectorsThe total number of failed offset commitsnumber of connectors run in this worker (copy of existing metric)kafka.connect:type=connect-worker-metrics,name=offset-commit-failure-totalassigned-connectors,worker=([-.\w]+)offset commit failure percentage

task count

The average percentage of offset commits that failednumber of tasks run in this workerkafka.connect:type=connect-worker-metrics,name=offset-commit-failure-percentagetask-count,worker=([-.\w]+)offset commit total
connector countThe total number of offset commitsconnectors run in this workerkafka.connect:type=connect-worker-metrics,name=offsetconnector-commit-totalcount,worker=([-.\w]+)offset commit maximum time
sink record rateThe number of sink records consumed per second by all sink connectors in this workerkafka.connect:type=connect-worker-metrics,name=sink-record-rate,worker=([-.\w]+)offset commit 99th percentile time
sink record totalThe total number of sink records consumed by all sink connector in this workerkafka.connect:type=connect-worker-metrics,name=sink-record-total,worker=([-.\w]+)offset commit 95th percentile time
source record rateThe number of source records produced per second by all source connectors in this workerkafka.connect:type=connect-worker-metrics,name=source-record-rate,worker=([-.\w]+)offset commit 90th percentile time
source record totalThe total number of source records produced by all source connector in this workerkafka.connect:type=connect-worker-metrics,name=source-record-total,worker=([-.\w]+)
leader nameThe name of the group leaderoffset commit 75th percentile timeThe 75th percentile time spent to commit offsets during the last window (defaults to an hour)kafka.connect:type=connect-worker-metrics,name=offset-commit-75p-timeleader-name,worker=([-.\w]+)

...

The total number of successful rebalances-rebalancerebalance-success-totalrebalance percentage average percentage of rebalances that succeeded-rebalancerebalancepercentagerebalance 90th 90th to rebalance -rebalancerebalance90prebalance 75th 75th rebalance -rebalancerebalance75p
stateThe state of this worker, one of: rebalancing, running
Metric NameDescriptionMBean attribute
rebalance success totalkafka.connect:type=connect-worker-metrics,name=state,worker=([-.\w]+)
offset commit success totalThe total number of this worker's successful offset commitskafka.connect:type=connect-worker-metrics,name=offset-commit-success-total,worker=([-.\w]+)rebalance failure total
offset commit success percentageThe total number of failed rebalances average percentage of this worker's offset commits that succeededkafka.connect:type=connect-worker-rebalance-metrics,name=rebalanceoffset-commit-failuresuccess-totalpercentage,worker=([-.\w]+)rebalance
offset commit failure percentagetotalThe average percentage of rebalances that failed total number of this worker's failed offset commitskafka.connect:type=connect-worker-rebalance-metrics,name=rebalanceoffset-commit-failure-percentagetotal,worker=([-.\w]+)
offset commit failure percentageThe average percentage of this worker's offset commits that failedrebalance maximum timeThe maximum time spent to rebalancekafka.connect:type=connect-worker-rebalance-metrics,name=rebalanceoffset-commit-maxfailure-timepercentage,worker=([-.\w]+)
offset commit totalThe total number of this worker's offset commitsrebalance 99th percentile timeThe 99th percentile time spent to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalanceoffset-99pcommit-timetotal,worker=([-.\w]+)
rebalance 95th percentile offset commit maximum timeThe 95th percentile maximum time this worker spent to rebalance during the last window (defaults to an hour)commit offsetskafka.connect:type=connect-worker-rebalance-metrics,name=rebalanceoffset-commit-95pmax-time,worker=([-.\w]+)
offset commit 99th percentile timeThe 99th percentile time spent by this worker to commit offsets during the last window (defaults to an hour)kafka.connect:type=connect-worker-metrics,name=offset-commit-99p-time,worker=([-.\w]+)
offset commit 95th percentile timeThe 95th percentile time spent by this worker to commit offsets during the last window (defaults to an hour)kafka.connect:type=connect-worker-metrics,name=offset-commit-95p-time,worker=([-.\w]+)
time since last rebalanceoffset commit 90th percentile timeThe time since the most recent rebalance 90th percentile time spent by this worker to commit offsets during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=timeoffset-sincecommit-last90p-rebalancetime,worker=([-.\w]+)
offset commit 75th percentile timeThe 75th percentile time spent by this worker to commit offsets during the last window (defaults to an hour)task failure rateThe number of tasks that failed in this workerkafka.connect:type=connect-worker-rebalance-metrics,name=taskoffset-commit-failure75p-ratetime,worker=([-.\w]+)


Worker

...

Rebalance Metrics

Metric NameDescriptionMBean attribute
rebalance success totalThe total number of this worker's successful rebalancesREST request rateThe number of requests handled by the REST endpointskafka.connect:type=connect-worker-restrebalance-metrics,name=requestrebalance-success-ratetotal,worker=([-.\w]+)

 

Proposed Changes

We will add the relevant metrics as specified in the Public Interfaces section.

rebalance success percentageThe average percentage of this worker's rebalances that succeededkafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-success-percentage,worker=([-.\w]+)
rebalance failure totalThe total number of this worker's failed rebalanceskafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-failure-total,worker=([-.\w]+)
rebalance failure percentageThe average percentage of this worker's rebalances that failedkafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-failure-percentage,worker=([-.\w]+)
rebalance maximum timeThe maximum time spent by this worker to rebalancekafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-max-time,worker=([-.\w]+)
rebalance 99th percentile timeThe 99th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-99p-time,worker=([-.\w]+)
rebalance 95th percentile timeThe 95th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-95p-time,worker=([-.\w]+)
rebalance 90th percentile timeThe 90th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-90p-time,worker=([-.\w]+)
rebalance 75th percentile timeThe 75th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-75p-time,worker=([-.\w]+)
time since last rebalanceThe time since the most recent rebalance in this workerkafka.connect:type=connect-worker-rebalance-metrics,name=time-since-last-rebalance,worker=([-.\w]+)
task failure rateThe number of tasks that failed in this workerkafka.connect:type=connect-worker-rebalance-metrics,name=task-failure-rate,worker=([-.\w]+


Worker REST Metrics

Metric NameDescriptionMBean attribute
REST request rateThe number of requests handled by the REST endpoints in this workerkafka.connect:type=worker-rest-metrics,name=request-rate,worker=([-.\w]+)

 

Proposed Changes

We will add the relevant metrics as specified in the Public Interfaces section. The scope of each metric is limited by a single worker, and these metrics can be aggregated across workers; this is why the worker name is included in all MBean attributes.

CompatibilityCompatibility, Deprecation, and Migration Plan

We are introducing new metrics so there is no compatibility impact. Note that two Two existing metrics exist but will not be changed. However, these new metrics use slightly different patterns for the MBean attributes to always include the worker name, new metrics will be added to replicate these existing metrics with the new naming pattern.

Rejected Alternatives

The following metrics were considered but were rejected:None