...
An important part of deploying Kafka Connect is monitoring the health of the workers in a cluster and the connectors and tasks that have been deployed to the cluster. The Kafka Connect framework only has a few metrics capturing the number of connectors and tasks for each worker, so we propose to add metrics to monitor more information about the connectors, tasks, and workers. This proposal expressly avoids changes to the Connect API, and therefore does not address how connector implementations can define their own connector-specific metrics.
All metrics reported by each worker are scoped by the activities within that worker. Kafka Connect does not have any existing mechanism to aggregate the metrics reported by each worker, and therefore any such aggregation is out of scope for this KIP.
Public Interfaces
All of the following will be added via Kafka's metrics library like most of the metrics in the Kafka brokers and other components.
Source Task Metrics
The scope of all metrics are limited to the worker where the metrics are being reported, and all metric names include the name of the worker in the MBean attribute.
Source Task Metrics
Metric Name | Description | MBean attribute | |
---|---|---|---|
source record produce rate | The number of records per second produced (before transformation) by | ||
Metric Name | Description | MBean attribute | |
source record rate | The number of records produced per second bythis task belonging to the named source connector in this worker | kafka.connect:type=source-task-metrics,name=source-record-produce-rate, | connectorworker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
source | record totalrecord produce total | The total number of records produced (before transformation) by this task belonging to the named source connector in this worker | kafka.connect:type=source-task-metrics,name=source-record-produce-total,worker=([-.\w]+)l,connector=([-.\w]+),task=([\d]+) |
poll time percentagesource record write rate | The average percentage of time spent polling number of records per second output from the transformations and written to Kafka for this task belonging to the named source connector in this worker. This is after transformation and excludes any records filtered out by the transformations. | kafka.connect:type=source-task-metrics,name=poll-time-percentagesource-record-produce-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)transform time percentage | |
source record write total | The average percentage of time spent transforming source records for total number of records output from the transformations and written to Kafka by this task belonging to the named source connector in this worker. This is after transformation and excludes any records filtered out by the transformations. | kafka.connect:type=source-task-metrics,name=transform-time-percentagesource-record-produce-total,worker=([-.\w]+)l,connector=([-.\w]+),task=([\d]+) | |
write poll time percentage | The average percentage of time spent converting and writing source records for polling this task belonging to the named source connectorin this worker | kafka.connect:type=source-task-metrics,name=writepoll-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) | |
pause transform time percentage | The average percentage of time spent transforming source records for this task belonging to the named source connector were paused in this worker | kafka.connect:type=source-task-metrics,name=pausetransform-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
...
write time percentage | The average percentage of time spent converting and writing source records for this task | |||
Metric Name | Description | MBean attribute | ||
---|---|---|---|---|
source record rate | The number of records produced per second by all tasks belonging to the named source connector in this worker | kafka.connect:type=source-connectortask-metrics,name=source-record-ratewrite-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)source record total | ||
pause time percentage | The total number of records produced by all tasks average percentage of time this task belonging to the named source connectorin this worker were paused | kafka.connect:type=source-connectortask-metrics,name=sourcepause-recordtime-totalpercentage,connectorworker=([-.\w]+) | poll time percentage | The average percentage of time spent polling all tasks belonging to the named source connector,connector=([-.\w]+),task=([\d]+) |
Source Connector Metrics
Metric Name | Description | MBean attribute | |||||
---|---|---|---|---|---|---|---|
connector class | The name of the connector class | kafka.connect:type=source-connector-metrics,name= | pollconnector- | time-percentageclass, | connectorworker=([-.\w]+) | transform time percentage | The,connector=([-.\w]+) |
connector version | The version of the connector class, as reported by the connector in this worker | average percentage of time spent transforming source records for all tasks belonging to the named source connectorkafka.connect:type=source-connector-metrics,name= | transform-time-percentageconnector-version,worker=([-.\w]+),connector=([-.\w]+) | ||||
write time percentagesource record rate | The average percentage of time spent converting and writing source records for number of records produced per second by all tasks belonging to the named source connectorin this worker | kafka.connect:type=source-connector-metrics,name=write-time-percentagesource-record-rate,worker=([-.\w]+),connector=([-.\w]+)pause time percentage | |||||
source record total | The average percentage of time total number of records produced by all tasks belonging to the named source connector were pausedin this worker | kafka.connect:type=source-connector-metrics,name=pausesource-timerecord-percentagetotal,connectorworker=([-.\w]+),connector=([-.\w]+) | |||||
poll time percentage | The average percentage of time this worker spent polling all tasks belonging to the named source connector | status | The current status of the connector, one of: running, paused, stopped | kafka.connect:type=source-connector-metrics,name=status=poll-time-percentage,worker=([-.\w]+),connector=([-.\w]+) |
Sink Task Metrics
transform time percentage | The average percentage of time this worker spent transforming source records for all tasks | |||
Metric Name | Description | MBean attribute | ||
---|---|---|---|---|
sink record rate | The number of records produced per second by this task belonging to the named source connector | kafka.connect:type=sinksource-taskconnector-metrics,name=sinktransform-recordtime-ratepercentage,connectorworker=([-.\w]+),taskconnector=([-.\dw]+)sink record total | ||
write time percentage | The total number of records produced by this task average percentage of time this worker spent converting and writing source records for all tasks belonging to the named source connector | kafka.connect:type=sinksource-taskconnector-metrics,name=sinkwrite-recordtime-totalpercentage,connectorworker=([-.\w]+),taskconnector=([-.\dw]+) | ||
read pause time percentage | The average percentage of time spent polling this task all tasks in this worker belonging to the named source connector were paused | kafka.connect:type=sinksource-taskconnector-metrics,name=readpause-time-percentage,connectorworker=([-.\w]+),taskconnector=([-.\dw]+) | transform time percentage | The average percentage of time spent transforming sink records for this task belonging to the named sink connector |
status | The current status of the connector in this worker, one of: running, paused, stopped | kafka.connect:type=sinksource-taskconnector-metrics,name=transform-time-percentage,connectorstatus,worker=([-.\w]+),taskconnector=([-.\dw]+) | put time percentage |
Sink Task Metrics
Metric Name | Description | MBean attribute | ||||
---|---|---|---|---|---|---|
sink record read rate | The number of records per second read from Kafka for this | The average percentage of time thistask belonging to the named sink connector | spent putting/processing sink recordsin this worker. This includes all records passed to the transformations. | kafka.connect:type=sink-task-metrics,name= | put-time-percentagesink-record-read-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) | flush time percentage | The average percentage of time
sink record read total | The total number of records read from Kafka for this task belonging to the named sink connector | spent flushing sink recordsin this worker. This includes all records passed to the transformations. | kafka.connect:type=sink-task-metrics,name= | flush-time-percentagesink-record-read-total,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) | ||
pause time percentagesink record process rate | The average percentage of time this task belonging number of records per second output from the transformations and sent to this task belonging to the named sink connector were pausedin this worker. This is after transformation and excludes any records filtered out by the transformations. | kafka.connect:type=sink-task-metrics,name=pause-time-percentagesink-record-process-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
...
sink record process total | The total number of records output from the transformations and sent to this task |
Metric Name | Description | MBean attribute | |||
---|---|---|---|---|---|
sink record rate | The number of records produced per second by all tasks belonging to the named source connectorsink connector in this worker. This is after transformation and excludes any records filtered out by the transformations. | kafka.connect:type=sink-connectortask-metrics,name=sink-record-rate-process-total,worker=([-.\w]+),connector=([-.\w]+) | sink record total | The total number of records produced by all tasks belonging to the named source connector | kafka.connect:type=sink-connector-metrics,name=sink-record-total,connector,task=([-.\wd]+) |
read time percentage | The average percentage of time spent polling all tasks this task belonging to the named source connectorin this worker | kafka.connect:type=sink-connectortask-metrics,name=read-time-percentage,connectorworker=([-.\w]+),connector=([-.\w]+),task=([\d]+) | |||
transform time percentage | The | transform time percentage | The average percentage of time spent transforming sink records for all tasks this task belonging to the named sink connectorin this worker | kafka.connect:type=sink-connectortask-metrics,name=transform-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) | |
put time percentage | The The average percentage of time all tasks this task belonging to the named sink connector spent in this worker spent putting/processing sink records | kafka.connect:type=sink-connectortask-metrics,name=put-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) | |||
flush time percentage | The The average percentage of time all tasks this task belonging to the named sink connector in this worker spent flushing sink records | kafka.connect:type=sink-connectortask-metrics,name=flush-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) | |||
pause time percentage | The average percentage of time all tasks this task belonging to the named sink connector in this worker were paused | kafka.connect:type=sink-connectortask-metrics,name=pause-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
Sink Connector Metrics
Metric Name | Description | MBean attribute | |||
---|---|---|---|---|---|
connector class | The name of the connector class | kafka | status | The current status of the connector, one of: running, paused, stopped | kafka.connect:type=sink-connector-metrics,name=status,connectorconnector-class,worker=([-.\w]+),connector=([-.\w]+) |
partition countconnector version | The number of topic partitions assigned to all tasks running version of the connector class, as reported by the connector in this worker for the named sink connector | kafka.connect:type=sink-connector-metrics,name=partition-count=connector-version,worker=([-.\w]+),connector=([-.\w]+) |
...
sink record rate | The number of records produced per second in this worker by all tasks belonging to the named source connector |
Metric Name | Description | MBean attribute | ||||||
---|---|---|---|---|---|---|---|---|
assigned tasks | The number of tasks run in this worker (existing metric)kafka.connect:type= | connectsink- | coordinatorconnector-metrics,name= | assignedsink-record- | tasksrate,worker=([-.\w]+),connector=([-.\w]+) | assigned connectors|||
sink record total | The total number of | connectors runrecords produced in this worker | (existing metric)by all tasks belonging to the named source connector | kafka.connect:type= | connectsink- | coordinatorconnector-metrics,name= | assignedsink-record- | connectorstotal,worker=([-.\w]+),connector=([-.\w]+) |
read time percentage | The | number of tasks run in this workeraverage percentage of time this worker spent polling all tasks belonging to the named source connector | kafka.connect:type= | connectsink- | workerconnector-metrics,name= | assignedread- | taskstime-percentage,worker=([-.\w]+),connector=([-.\w]+) | |
transform time percentage | The | number of connectors run in this workeraverage percentage of time this worker spent transforming sink records for all tasks belonging to the named sink connector | kafka.connect:type= | connectsink- | workerconnector-metrics,name= | assignedtransform-time- | connectorspercentage,worker=([-.\w]+),connector=([-.\w]+) | |
sink record rateput time percentage | The number of sink records consumed per second by all sink connectorsaverage percentage of time all tasks belonging to the named sink connector in this worker spent putting/processing sink records | kafka.connect:type=connectsink-workerconnector-metrics,name=sinkput-recordtime-ratepercentage,worker=([-.\w]+),connector=([-.\w]+)sink record total | ||||||
flush time percentage | The total number of sink records consumed by all sink connectoraverage percentage of time all tasks belonging to the named sink connectorin this worker spent flushing sink records | kafka.connect:type=connectsink-workerconnector-metrics,name=sinkflush-recordtime-totalpercentage,worker=([-.\w]+) | source record rate | The number of source records produced per second by all source connectors | kafka.connect:type=connect-worker-metrics,name=source-record-rate,worker,connector=([-.\w]+)source record total | |||
pause time percentage | The total number of source records produced by all source connectoraverage percentage of time all tasks belonging to the named sink connector in this worker were paused | kafka.connect:type=connectsink-workerconnector-metrics,name=sourcepause-recordtime-totalpercentage,worker=([-.\w]+) | leader name | The name of the group leader | kafka.connect:type=connect-worker-metrics,name=leader-name,worker,connector=([-.\w]+) | |||
statestatus | The state current status of the connector in this worker, one of: rebalancingrunning, paused, runningstopped | kafka kafka.connect:type=connectsink-workerconnector-metrics,name=statestatus,worker=([-.\w]+),connector=([-.\w]+) | ||||||
offset commit success totalpartition count | The total number of successful offset commitstopic partitions assigned to all tasks running in this worker for the named sink connector | kafka.connect:type=connectsink-workerconnector-metrics,name=offset-commit-success-total,workerpartition-count,worker=([-.\w]+),connector=([-.\w]+) |
Worker Metrics
Metric Name | Description | MBean attribute | ||||
---|---|---|---|---|---|---|
assigned tasks | The number of tasks run in this worker (copy of existing metric) | offset commit success percentage | The average percentage of offset commits that succeeded | kafka.connect:type=connect-worker-metrics,name=offset-commit-success-percentageassigned-tasks,worker=([-.\w]+)offset commit failure total | ||
assigned connectors | The total number of failed offset commitsnumber of connectors run in this worker (copy of existing metric) | kafka.connect:type=connect-worker-metrics,name=offset-commit-failure-totalassigned-connectors,worker=([-.\w]+)offset commit failure percentage | ||||
task count | The average percentage of offset commits that failednumber of tasks run in this worker | kafka.connect:type=connect-worker-metrics,name=offset-commit-failure-percentagetask-count,worker=([-.\w]+)offset commit total | ||||
connector count | The total number of offset commitsconnectors run in this worker | kafka.connect:type=connect-worker-metrics,name=offsetconnector-commit-totalcount,worker=([-.\w]+) | offset commit maximum time | The maximum time spent to commit offsets|||
sink record rate | The number of sink records consumed per second by all sink connectors in this worker | kafka.connect:type=connect-worker-metrics,name= | offsetsink- | commitrecord- | max-timerate,worker=([-.\w]+) | offset commit 99th percentile time | The 99th percentile time spent to commit offsets during the last window (defaults to an hour)
sink record total | The total number of sink records consumed by all sink connector in this worker | kafka.connect:type=connect-worker-metrics,name= | offsetsink- | commitrecord- | 99p-timetotal,worker=([-.\w]+) | offset commit 95th percentile time | The 95th percentile time spent to commit offsets during the last window (defaults to an hour)
source record rate | The number of source records produced per second by all source connectors in this worker | kafka.connect:type=connect-worker-metrics,name= | offsetsource- | commitrecord- | 95p-timerate,worker=([-.\w]+) | offset commit 90th percentile time | The 90th percentile time spent to commit offsets during the last window (defaults to an hour)
source record total | The total number of source records produced by all source connector in this worker | kafka.connect:type=connect-worker-metrics,name= | offsetsource- | commitrecord- | 90p-timetotal,worker=([-.\w]+) | |
leader name | The name of the group leader | offset commit 75th percentile time | The 75th percentile time spent to commit offsets during the last window (defaults to an hour) | kafka.connect:type=connect-worker-metrics,name=offset-commit-75p-timeleader-name,worker=([-.\w]+) |
...
state | The state of this worker, one of: rebalancing, running | ||||||
Metric Name | Description | MBean attribute | |||||
---|---|---|---|---|---|---|---|
rebalance success total | The total number of successful rebalanceskafka.connect:type=connect-worker | -rebalance-metrics,name= | rebalance-success-totalstate,worker=([-.\w]+) | rebalance||||
offset commit success | percentagetotal | The | average percentage of rebalances that succeededtotal number of this worker's successful offset commits | kafka.connect:type=connect-worker | -rebalance-metrics,name= | rebalanceoffset-commit-success- | percentagetotal,worker=([-.\w]+)rebalance failure total |
offset commit success percentage | The total number of failed rebalances average percentage of this worker's offset commits that succeeded | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalanceoffset-commit-failuresuccess-totalpercentage,worker=([-.\w]+)rebalance | |||||
offset commit failure percentagetotal | The average percentage of rebalances that failed total number of this worker's failed offset commits | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalanceoffset-commit-failure-percentagetotal,worker=([-.\w]+) | |||||
offset commit failure percentage | The average percentage of this worker's offset commits that failed | rebalance maximum time | The maximum time spent to rebalance | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalanceoffset-commit-maxfailure-timepercentage,worker=([-.\w]+) | |||
offset commit total | The total number of this worker's offset commits | rebalance 99th percentile time | The 99th percentile time spent to rebalance during the last window (defaults to an hour) | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalanceoffset-99pcommit-timetotal,worker=([-.\w]+) | |||
rebalance 95th percentile offset commit maximum time | The 95th percentile maximum time this worker spent to rebalance during the last window (defaults to an hour)commit offsets | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalanceoffset-commit-95pmax-time,worker=([-.\w]+) | rebalance 90th|||||
offset commit 99th percentile time | The | 90th99th percentile time spent | to rebalanceby this worker to commit offsets during the last window (defaults to an hour) | kafka.connect:type=connect-worker | -rebalance-metrics,name= | rebalanceoffset-commit- | 90p99p-time,worker=([-.\w]+) |
offset commit 95th percentile time | The | 75th95th percentile time spent by this worker to | rebalancecommit offsets during the last window (defaults to an hour) | kafka.connect:type=connect-worker | -rebalance-metrics,name= | rebalanceoffset-commit- | 75p95p-time,worker=([-.\w]+) |
time since last rebalanceoffset commit 90th percentile time | The time since the most recent rebalance 90th percentile time spent by this worker to commit offsets during the last window (defaults to an hour) | kafka.connect:type=connect-worker-rebalance-metrics,name=timeoffset-sincecommit-last90p-rebalancetime,worker=([-.\w]+) | |||||
offset commit 75th percentile time | The 75th percentile time spent by this worker to commit offsets during the last window (defaults to an hour) | task failure rate | The number of tasks that failed in this worker | kafka.connect:type=connect-worker-rebalance-metrics,name=taskoffset-commit-failure75p-ratetime,worker=([-.\w]+) |
Worker
...
Rebalance Metrics
Metric Name | Description | MBean attribute | ||
---|---|---|---|---|
rebalance success total | The total number of this worker's successful rebalances | REST request rate | The number of requests handled by the REST endpoints | kafka.connect:type=connect-worker-restrebalance-metrics,name=requestrebalance-success-ratetotal,worker=([-.\w]+) |
Proposed Changes
We will add the relevant metrics as specified in the Public Interfaces section.
rebalance success percentage | The average percentage of this worker's rebalances that succeeded | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-success-percentage,worker=([-.\w]+) |
rebalance failure total | The total number of this worker's failed rebalances | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-failure-total,worker=([-.\w]+) |
rebalance failure percentage | The average percentage of this worker's rebalances that failed | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-failure-percentage,worker=([-.\w]+) |
rebalance maximum time | The maximum time spent by this worker to rebalance | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-max-time,worker=([-.\w]+) |
rebalance 99th percentile time | The 99th percentile time spent by this worker to rebalance during the last window (defaults to an hour) | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-99p-time,worker=([-.\w]+) |
rebalance 95th percentile time | The 95th percentile time spent by this worker to rebalance during the last window (defaults to an hour) | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-95p-time,worker=([-.\w]+) |
rebalance 90th percentile time | The 90th percentile time spent by this worker to rebalance during the last window (defaults to an hour) | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-90p-time,worker=([-.\w]+) |
rebalance 75th percentile time | The 75th percentile time spent by this worker to rebalance during the last window (defaults to an hour) | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-75p-time,worker=([-.\w]+) |
time since last rebalance | The time since the most recent rebalance in this worker | kafka.connect:type=connect-worker-rebalance-metrics,name=time-since-last-rebalance,worker=([-.\w]+) |
task failure rate | The number of tasks that failed in this worker | kafka.connect:type=connect-worker-rebalance-metrics,name=task-failure-rate,worker=([-.\w]+ |
Worker REST Metrics
Metric Name | Description | MBean attribute |
---|---|---|
REST request rate | The number of requests handled by the REST endpoints in this worker | kafka.connect:type=worker-rest-metrics,name=request-rate,worker=([-.\w]+) |
Proposed Changes
We will add the relevant metrics as specified in the Public Interfaces section. The scope of each metric is limited by a single worker, and these metrics can be aggregated across workers; this is why the worker name is included in all MBean attributes.
CompatibilityCompatibility, Deprecation, and Migration Plan
We are introducing new metrics so there is no compatibility impact. Note that two Two existing metrics exist but will not be changed. However, these new metrics use slightly different patterns for the MBean attributes to always include the worker name, new metrics will be added to replicate these existing metrics with the new naming pattern.
Rejected Alternatives
The following metrics were considered but were rejected:None