Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

An important part of deploying Kafka Connect is monitoring the health of the workers in a cluster and the connectors and tasks that have been deployed to the cluster. The Although producers and consumers used in Kafka Connect can be monitored, the Kafka Connect framework only has a few metrics capturing the number of connectors and tasks for each worker. To augment these existing metrics, so we propose to add metrics to monitor more information about the connectors, tasks, and workers. All metrics reported by each worker are scoped by the activities within that worker.

...

All of the following will be added via Kafka's metrics library like most of the metrics in the Kafka brokers and other components. The context of all metrics are limited to the worker where the metrics are being reported, and all metrics include the worker ID in the MBean attribute (similarly to how Kafka producer and consumer metrics include the client ID).

 

...


Connector Metrics

Metric NameDescriptionMBean attribute
source record produce rateconnector-typeThe number of records per second produced (before transformation) by this task belonging to the named source connector in this workertype of the connector, one of: source, sinkkafka.connect:type=sourceconnector-task-metricsmetrics,name=source-record-produce-rate,workerconnector-type,connector=([-.\w]+)
connector-classThe name of the connector classkafka.connect:type=connector-metrics,name=connector-class,connector=([-.\w]+),task=([\d]+)
source record produce totalconnector-versionThe total number of records produced (before transformation) by this task belonging to the named source version of the connector class, as reported by the connector in this workerkafka.connect:type=source-taskconnector-metrics,name=source-record-produce-total,worker=([-.\w]+)l,connector-version,connector=([-.\w]+),task=([\d]+)
source record write ratestatusThe number of records per second output from the transformations and written to Kafka for this task belonging to the named source connector in this worker. This is after transformation and excludes any records filtered out by the transformations.current status of the connector in this worker, one of: running, paused, stopped  kafkakafka.connect:type=source-taskconnector-metrics,name=source-record-produce-rate,workerstatus,connector=([-.\w]+),

Common Task Metrics

Metric NameDescriptionMBean attribute
statusThe current status of this task, one of: unassigned, running, paused, failed, destroyedkafka.connect:type=task-metrics,name=status,connector=([-.\w]+),task=([\d]+)
source record write totalpause-ratioThe total number of records output from the transformations and written to Kafka by this task belonging to the named source connector in this worker. This is after transformation and excludes any records filtered out by the transformationsfraction of time this task has spent in the paused state.kafka.connect:type=source-task-metrics,name=source-record-produce-total,worker=([-.\w]+)l,pause-ratio  ,connector=([-.\w]+),task=([\d]+)
poll time offset-commit-success-percentageThe average percentage of time spent polling this task belonging to the named source connector in this workerthis task's offset commit attempts that succeededkafka.connect:type=source-task-metrics,name=polloffset-commit-timesuccess-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)transform time
offset-commit-failure-percentageThe average percentage of time spent transforming source records for this task belonging to the named source connector in this workerthis task's offset commit attempts that failed or had an errorkafka.connect:type=source-task-metrics,name=transformoffset-commit-timefailure-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)write time percentage
offset-commit-avg-timeThe average percentage of time spent converting and writing source records for time taken by this task belonging to the named source connector in this workerto commit offsetskafka.connect:type=source-task-metrics,name=writeoffset-time-percentage,worker=([-.\w]+)commit-max-time,connector=([-.\w]+),task=([\d]+)pause time percentage
offset-commit-max-timeThe average percentage of maximum time taken by this task belonging to the named source connector in this worker were pausedto commit offsetskafka.connect:type=source-task-metrics,name=pauseoffset-commit-time-percentage,worker=([-.\w]+)max-time,connector=([-.\w]+),task=([\d]+)

...

The name of the connector classsourceconnector-connectorclassworkerconnector-.wconnector version version connector class, as reported in this workersource-connectorconnectorversionworkerconnector-.wThe
batch-size-maxThe maximum size of the batches processed by the connector
Metric NameDescriptionMBean attribute
connector classkafka.connect:type=task-metrics,name=batch-size-max,connector=([-.\w]+),task=([\d]+)
batch-size-avgThe average size of the batches processed by the connectorkafka.connect:type=task-metrics,name=batch-size-avg,connector=([-.\w]+),task=([\d]+)source record rate

Source Task Metrics

 

Metric NameDescriptionMBean attribute
source-record-poll-rateThe average per-second number of records produced
per second by all tasks
/polled (before transformation) by this task belonging to the named source connector in this worker. This is before transformations are applied.kafka.connect:type=source-
connector
task-metrics,name=source-record-produce-rate,
worker
connector=([-.\w]+),
connector
task=([
-.
\
w
d]+)
source
record total
-record-write-rateThe
total
average per-second number of records
produced by all tasks
per second output from the transformations and written to Kafka for this task belonging to the named source connector in this worker. This is after transformations are applied.kafka.connect:type=source-
connector
task-metrics,name=source-record-write-
total
rate,
worker
connector=([-.\w]+),
connector
task=([
-.
\
w
d]+)

 

Sink Task Metrics

poll time percentage percentage of time this worker spent polling all tasks source connectorsourceconnectorpolltimepercentageworkerconnector-.wThe average percentage of time this worker spent converting and writing source records for all tasks belonging to the named source connectorsourceconnectorwritetimepercentageworkerconnector-.w]+)
Metric NameDescriptionMBean attribute
sink-record-read-rateThe average per-second number of records read from Kafka for this task belonging to the named sink connector in this worker. This is before transformations are applied.kafka.connect:type=sink-task-metrics,name=sink-record-read-rate,connector=([-.\w]+),task=([\d]+)
transform time percentagesink-record-send-rateThe average percentage of time this worker spent transforming source records for all tasks belonging to the named source connectorper-second numbrer of records output from the transformations and sent to this task belonging to the named sink connector in this worker. This is after transformations are applied and excludes any records filtered out by the transformations.kafka.connect:type=sourcesink-connectortask-metrics,name=transformsink-record-timeprocess-percentagerate  ,workerconnector=([-.\w]+),connectortask=([-.\wd]+)write time percentage
sink-record-lag-maxThe maximum lag in terms of number of records for any partition in this windowkafka.connect:type=sink-task-metrics,name=sink-record-lag-max,connector=([-.\w]+),task=([\
pause time percentageThe average percentage of time all tasks in this worker belonging to the named source connector were pausedkafka.connect:type=source-connector-metrics,name=pause-time-percentage,worker=([-.\w]+),connector=([-.\w]+)
 statusThe current status of the connector in this worker, one of: running, paused, stopped  kafka.connect:type=source-connector-metrics,name=status,worker=([-.\w]+),connector=([-.\w]+)

 

Sink Task Metrics

 

Metric NameDescriptionMBean attribute
sink record read rateThe number of records per second read from Kafka for this task belonging to the named sink connector in this worker. This includes all records passed to the transformations.kafka.connect:type=sink-task-metrics,name=sink-record-read-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
sink record read totalThe total number of records read from Kafka for this task belonging to the named sink connector in this worker. This includes all records passed to the transformations.kafka.connect:type=sink-task-metrics,name=sink-record-read-total,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
sink record process rateThe number of records per second output from the transformations and sent to this task belonging to the named sink connector in this worker. This is after transformation and excludes any records filtered out by the transformations.kafka.connect:type=sink-task-metrics,name=sink-record-process-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
sink record process totalThe total number of records output from the transformations and sent to this task belonging to the named sink connector in this worker. This is after transformation and excludes any records filtered out by the transformations.kafka.connect:type=sink-task-metrics,name=sink-record-process-total,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
read time percentageThe average percentage of time spent polling this task belonging to the named source connector in this workerkafka.connect:type=sink-task-metrics,name=read-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
transform time percentageThe average percentage of time spent transforming sink records for this task belonging to the named sink connector in this workerkafka.connect:type=sink-task-metrics,name=transform-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
put time percentageThe average percentage of time this task belonging to the named sink connector in this worker spent putting/processing sink recordskafka.connect:type=sink-task-metrics,name=put-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
flush time percentageThe average percentage of time this task belonging to the named sink connector in this worker spent flushing sink recordskafka.connect:type=sink-task-metrics,name=flush-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
pause time percentageThe average percentage of time this task belonging to the named sink connector in this worker were pausedkafka.connect:type=sink-task-metrics,name=pause-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)

...

d]+)
sink-record-{topic}-{partition}.records-lagThe latest lag in terms of number of records behind the consumer the offset commits are for the topic partition.kafka.connect:type=sink-task-metrics,name=sink-record-{topic}-{partition}-lag,connector

 

Metric NameDescriptionMBean attribute
connector classThe name of the connector classkafka.connect:type=sink-connector-metrics,name=connector-class,worker=([-.\w]+),connector=([-.\w]+)
connector versionThe version of the connector class, as reported by the connector in this workerkafka.connect:type=sink-connector-metrics,name=connector-version,worker=([-.\w]+),connector=([-.\w]+)
sink record rateThe number of records produced per second in this worker by all tasks belonging to the named source connectorkafka.connect:type=sink-connector-metrics,name=sink-record-rate,worker=([-.\w]+),connector=([-.\w]+)
sink record totalThe total number of records produced in this worker by all tasks belonging to the named source connectorkafka.connect:type=sink-connector-metrics,name=sink-record-total,worker=([-.\w]+),connector=([-.\w]+)
read time percentageThe average percentage of time this worker spent polling all tasks belonging to the named source connectorkafka.connect:type=sink-connector-metrics,name=read-time-percentage,worker=([-.\w]+),connector=([-.\w]+)
transform time percentageThe average percentage of time this worker spent transforming sink records for all tasks belonging to the named sink connectorkafka.connect:type=sink-connector-metrics,name=transform-time-percentage,worker=([-.\w]+),connector=([-.\w]+)
put time percentageThe average percentage of time all tasks belonging to the named sink connector in this worker spent putting/processing sink recordskafka.connect:type=sink-connector-metrics,name=put-time-percentage,worker=([-.\w]+),connector=([-.\w]+)
flush time percentageThe average percentage of time all tasks belonging to the named sink connectorin this worker spent flushing sink recordskafka.connect:type=sink-connector-metrics,name=flush-time-percentage,worker=([-.\w]+),connector=([-.\w]+)
pause time percentageThe average percentage of time all tasks belonging to the named sink connector in this worker were pausedkafka.connect:type=sink-connector-metrics,name=pause-time-percentage,worker=([-.\w]+),connector=([-.\w]+)
statusThe current status of the connector in this worker, one of: running, paused, stopped  kafka.connect:type=sink-connector-metrics,name=status,worker=([-.\w]+),connector=([-.\w]+)
partition countThe number of topic partitions assigned to all tasks running in this worker for the named sink connectorkafka.connect:type=sink-connector-metrics,name=partition-count,worker=([-.\w]+),connector=([-.\w]+)

...

 

The maximum time this worker spent to commit offsetsconnectworkeroffset-commit-max-time,worker\w]+)
Metric NameDescriptionMBean attribute

assigned tasks

The number of tasks run in this worker (copy of existing metric)kafka.connect:type=connect-worker-metrics,name=assigned-tasks,worker=([-.\w]+)
assigned connectorsThe number of connectors run in this worker (copy of existing metric)kafka.connect:type=connect-worker-metrics,name=assigned-connectors,worker=([-.\w]+)

task count

The number of tasks run in this workerkafka.connect:type=connect-worker-metrics,name=task-count,worker=([-.\w]+)
connector countThe number of connectors run in this workerkafka.connect:type=connect-worker-metrics,name=connector-count,worker=([-.\w]+)
sink record rateThe number of sink records consumed per second by all sink connectors in this workerkafka.connect:type=connect-worker-metrics,name=sink-record-rate,worker=([-.\w]+)
sink record totalThe total number of sink records consumed by all sink connector in this workerkafka.connect:type=connect-worker-metrics,name=sink-record-total,worker=([-.\w]+)
source record rateThe number of source records produced per second by all source connectors in this workerkafka.connect:type=connect-worker-metrics,name=source-record-rate,worker=([-.\w]+)
source record totalThe total number of source records produced by all source connector in this workerkafka.connect:type=connect-worker-metrics,name=source-record-total,worker=([-.\w]+)
leader nameThe name of the group leaderkafka.connect:type=connect-worker-metrics,name=leader-name,worker=([-.\w]+)
stateThe state of this worker, one of: rebalancing, runningkafka.connect:type=connect-worker-metrics,name=state,worker=([-.\w]+)
offset commit success totalThe total number of this worker's successful offset commitskafka.connect:type=connect-worker-metrics,name=offset-commit-success-total,worker=([-.\w]+)
offset commit success percentageThe average percentage of this worker's offset commits that succeededkafka.connect:type=connect-worker-metrics,name=offset-commit-success-percentage,worker=([-.\w]+)
offset commit failure totalThe total number of this worker's failed offset commitskafka.connect:type=connect-worker-metrics,name=offset-commit-failure-total,worker=([-.\w]+)offset commit failure percentage,task=([\d]+)
sink-record-{topic}-{partition}.records-lag-avgThe average lag in terms of number of records behind the consumer the offset commits are for the topic partition.The average percentage of this worker's offset commits that failedkafka.connect:type=connectsink-workertask-metrics,name=offset-commit-failure-percentage,workersink-record-{topic}-{partition}-lag-avg,connector=([-.\w]+)offset commit total,task=([\d]+)
sink-record-{topic}-{partition}.records-lag-maxThe maximum lag in terms of number of records behind the consumer the offset commits are for the topic partition.The total number of this worker's offset commitskafka.connect:type=connectsink-workertask-metrics,name=offset-commit-total,workersink-record-{topic}-{partition}-lag-max,connector=([-.\w]+),task=([\d]+)offset commit maximum time
partition-countThe number of topic partitions assigned to this task belonging to the named sink connector in this worker.kafka.connect:type=sink-connector-metrics,name=partition-count,connector=([-.\w]+),task=([\d]+)

Worker Metrics

 

Metric NameDescriptionMBean attribute

assigned-tasks

The number of tasks run in this worker (existing metricoffset commit 99th percentile timeThe 99th percentile time spent by this worker to commit offsets during the last window (defaults to an hour)kafka.connect:type=connect-workercoordinator-metrics,name=offset-commit-99p-time,worker=([-.\w]+)offset commit 95th percentile timeassigned-tasks
assigned-connectorsThe number of connectors run in this worker (existing metricThe 95th percentile time spent by this worker to commit offsets during the last window (defaults to an hour)kafka.connect:type=connect-workercoordinator-metrics,name=offsetassigned-commit-95p-time,worker=([-.\w]+)offset commit 90th percentile timeconnectors

task-count

The number of tasks run in this workerThe 90th percentile time spent by this worker to commit offsets during the last window (defaults to an hour)kafka.connect:type=connect-worker-metrics,name=offset-commit-90p-time,worker=([-.\w]+)offset commit 75th percentile timetask-count
connector-countThe number of connectors run in this workerkafka.connect:type=connect-worker-metrics,name=connector-count
leader-nameThe name of the group leaderThe 75th percentile time spent by this worker to commit offsets during the last window (defaults to an hour)kafka.connect:type=connect-worker-metrics,name=leader-name 
stateThe state of this worker, one of: rebalancing, runningkafka.connect:type=connect-worker-metrics,name=state offset-commit-75p-time,worker=([-.\w]+)


Worker Rebalance Metrics

Metric NameDescriptionMBean attribute
rebalance-success-totalThe total number of this worker's successful rebalanceskafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-success-total,worker=([-.\w]+)
rebalance-success-percentageThe average percentage of this worker's rebalances that succeededkafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-success-percentage,worker=([-.\w]+) 
rebalance-failure-totalThe total number of this worker's failed rebalanceskafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-failure-total,worker=([-.\w]+)total
rebalance-failure-percentageThe average percentage of this worker's rebalances that failedkafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-failure-percentage,worker=([-.\w]+)
rebalance maximum -max-timeThe maximum time spent by this worker to rebalancekafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-max-time,worker=([-.\w]+)
rebalance 99th percentile -99p-timeThe 99th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-99p-time,worker=([-.\w]+)
rebalance 95th percentile -95p-timeThe 95th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-95p-time,worker=([-.\w]+)-95p-time
rebalance-90p-rebalance 90th percentile timeThe 90th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-90p-time,worker=([-.\w]+)
rebalance 75th percentile -75p-timeThe 75th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-75p-time,worker=([-.\w]+)
time-since-last-rebalanceThe time since the most recent rebalance in this workerkafka.connect:type=connect-worker-rebalance-metrics,name=time-since-last-rebalance,worker=([-.\w]+)
task-failure-rateThe number of tasks that failed in this workerkafka.connect:type=connect-worker-rebalance-metrics,name=task-failure-rate,worker=([-.\w]+


Worker REST Metrics

Metric NameDescriptionMBean attribute
REST request-rateThe average per second number of requests handled by the REST endpoints in this workerkafka.connect:type=worker-rest-metrics,name=request-rate,worker=([-.\w]+)

 

Proposed Changes

We will add the relevant metrics as specified in the Public Interfaces section. The scope of each metric is limited by a single worker, and these metrics can be aggregated across workers; this is why the worker name is included in all MBean attributes, except for the two existing metrics that will be left unmodified.

Compatibility, Deprecation, and Migration Plan

Two existing metrics exist but will not be changed. However, these new metrics use slightly different patterns for the MBean attributes to always include the worker name, new metrics will be added to replicate these existing metrics with the new naming pattern.

Rejected Alternatives

None


The average per-second number of retried record sends for a topic.