...
An important part of deploying Kafka Connect is monitoring the health of the workers in a cluster and the connectors and tasks that have been deployed to the cluster. The Although producers and consumers used in Kafka Connect can be monitored, the Kafka Connect framework only has a few metrics capturing the number of connectors and tasks for each worker. To augment these existing metrics, so we propose to add metrics to monitor more information about the connectors, tasks, and workers. All metrics reported by each worker are scoped by the activities within that worker.
...
All of the following will be added via Kafka's metrics library like most of the metrics in the Kafka brokers and other components. The context of all metrics are limited to the worker where the metrics are being reported, and all metrics include the worker ID in the MBean attribute (similarly to how Kafka producer and consumer metrics include the client ID).
...
Connector Metrics
Metric Name | Description | MBean attribute |
---|
source record produce rateconnector-type | The number of records per second produced (before transformation) by this task belonging to the named source connector in this workertype of the connector, one of: source, sink | kafka.connect:type=sourceconnector-task-metricsmetrics,name=source-record-produce-rate,workerconnector-type,connector=([-.\w]+) |
connector-class | The name of the connector class | kafka.connect:type=connector-metrics,name=connector-class,connector=([-.\w]+),task=([\d]+) |
source record produce totalconnector-version | The total number of records produced (before transformation) by this task belonging to the named source version of the connector class, as reported by the connector in this worker | kafka.connect:type=source-taskconnector-metrics,name=source-record-produce-total,worker=([-.\w]+)l,connector-version,connector=([-.\w]+),task=([\d]+) |
source record write ratestatus | The number of records per second output from the transformations and written to Kafka for this task belonging to the named source connector in this worker. This is after transformation and excludes any records filtered out by the transformations.current status of the connector in this worker, one of: running, paused, stopped | kafkakafka.connect:type=source-taskconnector-metrics,name=source-record-produce-rate,workerstatus,connector=([-.\w]+), |
Common Task Metrics
Metric Name | Description | MBean attribute |
---|
status | The current status of this task, one of: unassigned, running, paused, failed, destroyed | kafka.connect:type=task-metrics,name=status,connector=([-.\w]+),task=([\d]+) |
source record write totalpause-ratio | The total number of records output from the transformations and written to Kafka by this task belonging to the named source connector in this worker. This is after transformation and excludes any records filtered out by the transformationsfraction of time this task has spent in the paused state. | kafka.connect:type=source-task-metrics,name=source-record-produce-total,worker=([-.\w]+)l,pause-ratio ,connector=([-.\w]+),task=([\d]+) |
poll time offset-commit-success-percentage | The average percentage of time spent polling this task belonging to the named source connector in this workerthis task's offset commit attempts that succeeded | kafka.connect:type=source-task-metrics,name=polloffset-commit-timesuccess-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)transform time |
offset-commit-failure-percentage | The average percentage of time spent transforming source records for this task belonging to the named source connector in this workerthis task's offset commit attempts that failed or had an error | kafka.connect:type=source-task-metrics,name=transformoffset-commit-timefailure-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)write time percentage |
offset-commit-avg-time | The average percentage of time spent converting and writing source records for time taken by this task belonging to the named source connector in this workerto commit offsets | kafka.connect:type=source-task-metrics,name=writeoffset-time-percentage,worker=([-.\w]+)commit-max-time,connector=([-.\w]+),task=([\d]+)pause time percentage |
offset-commit-max-time | The average percentage of maximum time taken by this task belonging to the named source connector in this worker were pausedto commit offsets | kafka.connect:type=source-task-metrics,name=pauseoffset-commit-time-percentage,worker=([-.\w]+)max-time,connector=([-.\w]+),task=([\d]+) |
...
batch-size-max | The maximum size of the batches processed by the connector |
Metric Name | Description | MBean attribute |
---|
connector class | The name of the connector class | kafka.connect:type= | sourcetask- | connector-metrics,name= | connectorbatch-size- | classmax, | workerconnector=([-.\w]+), | connectortask=([ | -.\ | wd]+) |
connector versionbatch-size-avg | The | version average size of the | connector class, as reported batches processed by the connector | in this worker | kafka.connect:type= | source-connectortask-metrics,name= | connectorbatch-size- | versionavg, | workerconnector=([-.\w]+), | connectortask=([ | -.\ | wd]+) | source record rate | The |
Source Task Metrics
Metric Name | Description | MBean attribute |
---|
source-record-poll-rate | The average per-second number of records produced |
per second by all tasks /polled (before transformation) by this task belonging to the named source connector in this worker. This is before transformations are applied. | kafka.connect:type=source- |
connectortask-metrics,name=source-record-produce-rate, |
workerconnector-.w record total total average per-second number of records |
produced by all tasks per second output from the transformations and written to Kafka for this task belonging to the named source connector in this worker. This is after transformations are applied. | kafka.connect:type=source- |
connectortask-metrics,name=source-record-write- |
totalworkerconnector-.w
Sink Task Metrics
Metric Name | Description | MBean attribute |
---|
sink-record-read-rate | poll time percentage | The average | percentage of time this worker spent polling all tasks per-second number of records read from Kafka for this task belonging to the named | source connectorsink connector in this worker. This is before transformations are applied. | kafka.connect:type= | sourcesink- | connectortask-metrics,name= | pollsink-record- | timeread- | percentagerate, | workerconnector=([-.\w]+), | connectortask=([ | -.\ | wd]+) |
transform time percentagesink-record-send-rate | The average percentage of time this worker spent transforming source records for all tasks belonging to the named source connectorper-second numbrer of records output from the transformations and sent to this task belonging to the named sink connector in this worker. This is after transformations are applied and excludes any records filtered out by the transformations. | kafka.connect:type=sourcesink-connectortask-metrics,name=transformsink-record-timeprocess-percentagerate ,workerconnector=([-.\w]+),connectortask=([-.\wd]+) | write time percentage | The average percentage of time this worker spent converting and writing source records for all tasks belonging to the named source connector
sink-record-lag-max | The maximum lag in terms of number of records for any partition in this window | kafka.connect:type= | sourcesink- | connectortask-metrics,name= | writesink-record- | timelag- | percentagemax, | workerconnector=([-.\w]+), | connectortask=([ | -.\ | w]+)
pause time percentage | The average percentage of time all tasks in this worker belonging to the named source connector were paused | kafka.connect:type=source-connector-metrics,name=pause-time-percentage,worker=([-.\w]+),connector=([-.\w]+) |
status | The current status of the connector in this worker, one of: running, paused, stopped | kafka.connect:type=source-connector-metrics,name=status,worker=([-.\w]+),connector=([-.\w]+) |
Sink Task Metrics
Metric Name | Description | MBean attribute |
---|
sink record read rate | The number of records per second read from Kafka for this task belonging to the named sink connector in this worker. This includes all records passed to the transformations. | kafka.connect:type=sink-task-metrics,name=sink-record-read-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
sink record read total | The total number of records read from Kafka for this task belonging to the named sink connector in this worker. This includes all records passed to the transformations. | kafka.connect:type=sink-task-metrics,name=sink-record-read-total,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
sink record process rate | The number of records per second output from the transformations and sent to this task belonging to the named sink connector in this worker. This is after transformation and excludes any records filtered out by the transformations. | kafka.connect:type=sink-task-metrics,name=sink-record-process-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
sink record process total | The total number of records output from the transformations and sent to this task belonging to the named sink connector in this worker. This is after transformation and excludes any records filtered out by the transformations. | kafka.connect:type=sink-task-metrics,name=sink-record-process-total,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
read time percentage | The average percentage of time spent polling this task belonging to the named source connector in this worker | kafka.connect:type=sink-task-metrics,name=read-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
transform time percentage | The average percentage of time spent transforming sink records for this task belonging to the named sink connector in this worker | kafka.connect:type=sink-task-metrics,name=transform-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
put time percentage | The average percentage of time this task belonging to the named sink connector in this worker spent putting/processing sink records | kafka.connect:type=sink-task-metrics,name=put-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
flush time percentage | The average percentage of time this task belonging to the named sink connector in this worker spent flushing sink records | kafka.connect:type=sink-task-metrics,name=flush-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
pause time percentage | The average percentage of time this task belonging to the named sink connector in this worker were paused | kafka.connect:type=sink-task-metrics,name=pause-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+) |
...
d]+) |
sink-record-{topic}-{partition}.records-lag | The latest lag in terms of number of records behind the consumer the offset commits are for the topic partition. | kafka.connect:type=sink-task-metrics,name=sink-record-{topic}-{partition}-lag,connector |
Metric Name | Description | MBean attribute |
---|
connector class | The name of the connector class | kafka.connect:type=sink-connector-metrics,name=connector-class,worker=([-.\w]+),connector=([-.\w]+) |
connector version | The version of the connector class, as reported by the connector in this worker | kafka.connect:type=sink-connector-metrics,name=connector-version,worker=([-.\w]+),connector=([-.\w]+) |
sink record rate | The number of records produced per second in this worker by all tasks belonging to the named source connector | kafka.connect:type=sink-connector-metrics,name=sink-record-rate,worker=([-.\w]+),connector=([-.\w]+) |
sink record total | The total number of records produced in this worker by all tasks belonging to the named source connector | kafka.connect:type=sink-connector-metrics,name=sink-record-total,worker=([-.\w]+),connector=([-.\w]+) |
read time percentage | The average percentage of time this worker spent polling all tasks belonging to the named source connector | kafka.connect:type=sink-connector-metrics,name=read-time-percentage,worker=([-.\w]+),connector=([-.\w]+) |
transform time percentage | The average percentage of time this worker spent transforming sink records for all tasks belonging to the named sink connector | kafka.connect:type=sink-connector-metrics,name=transform-time-percentage,worker=([-.\w]+),connector=([-.\w]+) |
put time percentage | The average percentage of time all tasks belonging to the named sink connector in this worker spent putting/processing sink records | kafka.connect:type=sink-connector-metrics,name=put-time-percentage,worker=([-.\w]+),connector=([-.\w]+) |
flush time percentage | The average percentage of time all tasks belonging to the named sink connectorin this worker spent flushing sink records | kafka.connect:type=sink-connector-metrics,name=flush-time-percentage,worker=([-.\w]+),connector=([-.\w]+) |
pause time percentage | The average percentage of time all tasks belonging to the named sink connector in this worker were paused | kafka.connect:type=sink-connector-metrics,name=pause-time-percentage,worker=([-.\w]+),connector=([-.\w]+) |
status | The current status of the connector in this worker, one of: running, paused, stopped | kafka.connect:type=sink-connector-metrics,name=status,worker=([-.\w]+),connector=([-.\w]+) |
partition count | The number of topic partitions assigned to all tasks running in this worker for the named sink connector | kafka.connect:type=sink-connector-metrics,name=partition-count,worker=([-.\w]+),connector=([-.\w]+) |
...
Metric Name | Description | MBean attribute |
---|
assigned tasks | The number of tasks run in this worker (copy of existing metric) | kafka.connect:type=connect-worker-metrics,name=assigned-tasks,worker=([-.\w]+) |
assigned connectors | The number of connectors run in this worker (copy of existing metric) | kafka.connect:type=connect-worker-metrics,name=assigned-connectors,worker=([-.\w]+) |
task count | The number of tasks run in this worker | kafka.connect:type=connect-worker-metrics,name=task-count,worker=([-.\w]+) |
connector count | The number of connectors run in this worker | kafka.connect:type=connect-worker-metrics,name=connector-count,worker=([-.\w]+) |
sink record rate | The number of sink records consumed per second by all sink connectors in this worker | kafka.connect:type=connect-worker-metrics,name=sink-record-rate,worker=([-.\w]+) |
sink record total | The total number of sink records consumed by all sink connector in this worker | kafka.connect:type=connect-worker-metrics,name=sink-record-total,worker=([-.\w]+) |
source record rate | The number of source records produced per second by all source connectors in this worker | kafka.connect:type=connect-worker-metrics,name=source-record-rate,worker=([-.\w]+) |
source record total | The total number of source records produced by all source connector in this worker | kafka.connect:type=connect-worker-metrics,name=source-record-total,worker=([-.\w]+) |
leader name | The name of the group leader | kafka.connect:type=connect-worker-metrics,name=leader-name,worker=([-.\w]+) |
state | The state of this worker, one of: rebalancing, running | kafka.connect:type=connect-worker-metrics,name=state,worker=([-.\w]+) |
offset commit success total | The total number of this worker's successful offset commits | kafka.connect:type=connect-worker-metrics,name=offset-commit-success-total,worker=([-.\w]+) |
offset commit success percentage | The average percentage of this worker's offset commits that succeeded | kafka.connect:type=connect-worker-metrics,name=offset-commit-success-percentage,worker=([-.\w]+) |
offset commit failure total | The total number of this worker's failed offset commits | kafka.connect:type=connect-worker-metrics,name=offset-commit-failure-total,worker=([-.\w]+) | offset commit failure percentage | ,task=([\d]+) |
sink-record-{topic}-{partition}.records-lag-avg | The average lag in terms of number of records behind the consumer the offset commits are for the topic partition.The average percentage of this worker's offset commits that failed | kafka.connect:type=connectsink-workertask-metrics,name=offset-commit-failure-percentage,workersink-record-{topic}-{partition}-lag-avg,connector=([-.\w]+) | offset commit total | ,task=([\d]+) |
sink-record-{topic}-{partition}.records-lag-max | The maximum lag in terms of number of records behind the consumer the offset commits are for the topic partition.The total number of this worker's offset commits | kafka.connect:type=connectsink-workertask-metrics,name=offset-commit-total,workersink-record-{topic}-{partition}-lag-max,connector=([-.\w]+),task=([\d]+) | offset commit maximum time | The maximum time this worker spent to commit offsets
partition-count | The number of topic partitions assigned to this task belonging to the named sink connector in this worker. | kafka.connect:type= | connectsink- | workerconnector-metrics,name= | offset-commit-max-time,workerpartition-count,connector=([-. | \w]+)\w]+),task=([\d]+) |
Worker Metrics
Metric Name | Description | MBean attribute |
---|
assigned-tasks | The number of tasks run in this worker (existing metric | offset commit 99th percentile time | The 99th percentile time spent by this worker to commit offsets during the last window (defaults to an hour) | kafka.connect:type=connect-workercoordinator-metrics,name=offset-commit-99p-time,worker=([-.\w]+) | offset commit 95th percentile time | assigned-tasks |
assigned-connectors | The number of connectors run in this worker (existing metricThe 95th percentile time spent by this worker to commit offsets during the last window (defaults to an hour) | kafka.connect:type=connect-workercoordinator-metrics,name=offsetassigned-commit-95p-time,worker=([-.\w]+) | offset commit 90th percentile time | connectors |
task-count | The number of tasks run in this workerThe 90th percentile time spent by this worker to commit offsets during the last window (defaults to an hour) | kafka.connect:type=connect-worker-metrics,name=offset-commit-90p-time,worker=([-.\w]+) | offset commit 75th percentile time | task-count |
connector-count | The number of connectors run in this worker | kafka.connect:type=connect-worker-metrics,name=connector-count |
leader-name | The name of the group leaderThe 75th percentile time spent by this worker to commit offsets during the last window (defaults to an hour) | kafka.connect:type=connect-worker-metrics,name=leader-name |
state | The state of this worker, one of: rebalancing, running | kafka.connect:type=connect-worker-metrics,name=state offset-commit-75p-time,worker=([-.\w]+) |
Worker Rebalance Metrics
Metric Name | Description | MBean attribute |
---|
rebalance-success-total | The total number of this worker's successful rebalances | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-success-total,worker=([-.\w]+) |
rebalance-success-percentage | The average percentage of this worker's rebalances that succeeded | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-success-percentage,worker=([-.\w]+) |
rebalance-failure-total | The total number of this worker's failed rebalances | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-failure-total,worker=([-.\w]+)total |
rebalance-failure-percentage | The average percentage of this worker's rebalances that failed | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-failure-percentage,worker=([-.\w]+) |
rebalance maximum -max-time | The maximum time spent by this worker to rebalance | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-max-time,worker=([-.\w]+) |
rebalance 99th percentile -99p-time | The 99th percentile time spent by this worker to rebalance during the last window (defaults to an hour) | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-99p-time,worker=([-.\w]+) |
rebalance 95th percentile -95p-time | The 95th percentile time spent by this worker to rebalance during the last window (defaults to an hour) | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-95p-time,worker=([-.\w]+)-95p-time |
rebalance-90p-rebalance 90th percentile time | The 90th percentile time spent by this worker to rebalance during the last window (defaults to an hour) | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-90p-time,worker=([-.\w]+) |
rebalance 75th percentile -75p-time | The 75th percentile time spent by this worker to rebalance during the last window (defaults to an hour) | kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-75p-time,worker=([-.\w]+) |
time-since-last-rebalance | The time since the most recent rebalance in this worker | kafka.connect:type=connect-worker-rebalance-metrics,name=time-since-last-rebalance,worker=([-.\w]+) |
task-failure-rate | The number of tasks that failed in this worker | kafka.connect:type=connect-worker-rebalance-metrics,name=task-failure-rate,worker=([-.\w]+ |
Worker REST Metrics
Metric Name | Description | MBean attribute |
---|
REST request-rate | The average per second number of requests handled by the REST endpoints in this worker | kafka.connect:type=worker-rest-metrics,name=request-rate,worker=([-.\w]+) |
Proposed Changes
We will add the relevant metrics as specified in the Public Interfaces section. The scope of each metric is limited by a single worker, and these metrics can be aggregated across workers; this is why the worker name is included in all MBean attributes, except for the two existing metrics that will be left unmodified.
Compatibility, Deprecation, and Migration Plan
Two existing metrics exist but will not be changed. However, these new metrics use slightly different patterns for the MBean attributes to always include the worker name, new metrics will be added to replicate these existing metrics with the new naming pattern.
Rejected Alternatives
None
The average per-second number of retried record sends for a topic.