Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated task-metrics to the correct mbean name connector-task-metrics

Table of Contents

Status

Current stateUnder DiscussionApproved

Discussion thread: here

JIRA: here

...

An important part of deploying Kafka Connect is monitoring the health of the workers in a cluster and the connectors and tasks that have been deployed to the cluster. The Although producers and consumers used in Kafka Connect can be monitored, the Kafka Connect framework only has a few metrics capturing the number of connectors and tasks for each worker. To augment these existing metrics, so we propose to add metrics to monitor more information about the connectors, tasks, and workers. All metrics reported by each worker are scoped by the activities within that worker.

There are several things that are out of scope for this proposal, though they may be addressed in future KIPs. First, this proposal expressly avoids changes to the Connect API, and therefore does not address how connector implementations can define their own connector-specific metrics. Second, Kafka Connect does not have any existing mechanism to aggregate the metrics reported by each workeracross multiple workers.

Public Interfaces

All of the following will be added via Kafka's metrics library like most of the metrics in the Kafka brokers and other components. The context of all metrics are limited to the worker where the metrics are being reported, and all metrics include the worker ID in the MBean attribute (similarly to how Kafka producer and consumer metrics include the client ID).

Source Task Metrics

...

are defined as attributes on the specified MBean attribute and are measured within the context of a single worker. All metrics defined below are at the INFO recording level.

Connector Metrics

MBean namekafka.connect:type=

...

connector-metrics,

...

connector=

...

([-.\w]+)


Metric/Attribute Name

Description

Implemented
connector-typeThe type of the connector, one of: source, sink1.0.0
connector-classThe name of the connector class1.0.0
connector-versionThe version of the connector class, as reported by the connector in this worker1.0.0
statusThe current status of the connector in this worker, one of: running, paused, stopped 1.0.0

Common Task Metrics

MBean namekafka.connect:type=connector-task-metrics,connector

...

=([-.\w]+),

...

task=([-.\w

...

]+)

...

Metric/Attribute Name

Description

Implemented
statusThe current status of this task, one of: unassigned, running, paused, failed, destroyed1.0.0
pause-ratioThe fraction of time this task has spent in the paused state.1.0.0
running-ratioThe fraction of time this task has spent in the running state.1.0.0
offset-commit-success-percentageThe average percentage of this task's offset commit attempts that succeeded1.0.0
offset-commit-failure-percentageThe average percentage of this task's offset commit attempts that failed or had an error1.0.0
offset-commit-max-time-msThe maximum time in milliseconds taken by this task to commit offsets1.0.0
offset-commit-avg-time-msThe average time in milliseconds taken by this task to commit offsets1.0.0
offset-commit-99p-time-msThe 99th percentile time in milliseconds spent by this task to commit offsets to Kafka
offset-commit-95p-time-msThe 95th percentile time in milliseconds spent by this task to commit offsets to Kafka
offset-commit-90p-time-msThe 90th percentile time in milliseconds spent by this task to commit offsets to Kafka
offset-commit-75p-time-msThe 75th percentile time in milliseconds spent by this task to commit offsets to Kafka
offset-commit-50p-time-msThe 50th percentile (average) time in milliseconds spent by this task to commit offsets to Kafka
batch-size-maxThe maximum size of the batches processed by the connector1.0.0
batch-size-avgThe average size of the batches processed by the connector1.0.0

Source Task Metrics

MBean namekafka.connect:type=source-task-metrics,connector=([-.

...

\w]+),task=([\d]+)

...

Metric/Attribute NameDescription
MBean attributeconnector classThe name of the connector classkafka.connect:type=source-connector-metrics,name=connector-class,worker=([-.\w]+),connector=([-.\w]+)connector versionThe version of the connector class, as reported by the connector in this workerkafka.connect:type=source-connector-metrics,name=connector-version,worker=([-.\w]+),connector=([-.\w]+)
Implemented
source-record-poll-rateThe average per-second number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker.1.0.0
source-record-poll-totalThe number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker, since the task was last restarted.1.0.0
source-record-write-rateThe average per-second number of records output from the transformations and written to Kafka for this task
source record rateThe number of records produced per second by all tasks
belonging to the named source connector in this worker
kafka.connect:type=source-connector-metrics,name=
. This is after transformations are applied and excludes any records filtered out by the transformations.1.0.0
source-record-
rate,worker=([-.\w]+),connector=([-.\w]+)source record total
write-totalThe number of records output from the transformations and written to Kafka for this task belonging to the
The total number of records produced by all tasks belonging to the
named source connector in this worker
kafka.connect:type=source-connector-metrics,name=
, since the task was last restarted.1.0.0
source-record-
total,worker=([-.\w]+),connector=([-.\w]+)poll time percentageThe average percentage of time this worker spent polling all tasks belonging to the named source connectorkafka.connect:type=source-connector-metrics,name=poll-time-percentage,worker=([-.\w]+),connector=([-.\w]+)transform time percentageThe average percentage of time this worker spent transforming source records for all tasks belonging to the named source connectorkafka.connect:type=source-connector-metrics,name=transform-time-percentage,worker=([-.\w]+),connector=([-.\w]+)write time percentageThe average percentage of time this worker spent converting and writing source records for all tasks belonging to the named source connectorkafka.connect:type=source-connector-metrics,name=write-time-percentage,worker=([-.\w]+),connector=([-.\w]+)pause time percentageThe average percentage of time all tasks in this worker belonging to the named source connector were pausedkafka.connect:type=source-connector-metrics,name=pause-time-percentage,worker=([-.\w]+),connector=([-.\w]+) statusThe current status of the connector in this worker, one of: running, paused, stopped  kafka.connect:type=source-connector-metrics,name=status,worker=([-.\w]+),connector=([-.\w]+)

 

Sink Task Metrics

 

...

active-countThe most recent number of records that have been produced by this task but not yet completely written to Kafka.1.0.0
source-record-active-count-maxThe maximum number of records that have been produced by this task but not yet completely written to Kafka.1.0.0
source-record-active-count-avgThe average number of records that have been produced by this task but not yet completely written to Kafka.1.0.0
poll-batch-max-time-msThe maximum time in milliseconds taken by this task to poll for a batch of source records1.0.0
poll-batch-avg-time-msThe average time in milliseconds taken by this task to poll for a batch of source records1.0.0
poll-batch-99p-time-msThe 99th percentile time in milliseconds spent by this task to poll for a batch of source records
poll-batch-95p-time-msThe 95th percentile time in milliseconds spent by this task to poll for a batch of source records
poll-batch-90p-time-msThe 90th percentile time in milliseconds spent by this task to poll for a batch of source records
poll-batch-75p-time-msThe 75th percentile time in milliseconds spent by this task to poll for a batch of source records
poll-batch-50p-time-msThe 50th percentile (average) time in milliseconds spent by this task to poll for a batch of source records


Sink Task Metrics

MBean namekafka.connect:type=sink-task-metrics

...

,connector=([-.\w]+),task=([\d]+)

Metric/Attribute Name

Description

Implemented
sink-record-read-rateThe average per-second
sink record read totalThe total
number of records read from
Kafka for
Kafka for this task belonging to the named sink connector in this worker. This
includes all records passed to the transformations.kafka.connect:type=sink-task-metrics,name=
is before transformations are applied.1.0.0
sink-record-read-
total,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
totalThe total number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker, since the task was last restarted.1.0.0
sink-record-send-rateThe average per-second numbrer of records output from the transformations and
sink record process rateThe number of records per second output from the transformations and
sent to this task belonging to the named sink connector in this worker.
This
 This is after
transformation
transformations are applied and excludes any records filtered out by the transformations.
kafka.connect:type=sink-task-metrics,name=
1.0.0
sink-record-send-totalThe total number of records output from the transformations and sent to this task belonging to the named source connector in this worker, since the task was last restarted.1.0.0
sink-record-
process-rate,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)
lag-maxThe maximum lag in terms of number of records behind the consumer the offset commits are for any topic partitions.
sink-record-active-countThe most recent number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.1.0.0
sink-record-active-count-maxThe maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.1.0.0
sink-record-active-count-avgThe average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task.1.0.0
partition-countThe number of topic partitions assigned to
sink record process totalThe total number of records output from the transformations and sent to this task belonging to the named sink connector in this worker. This is after transformation and excludes any records filtered out by the transformations.kafka.connect:type=sink-task-metrics,name=sink-record-process-total,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)read time percentageThe average percentage of time spent polling
this task belonging to the named
source
sink connector in this worker.
kafka.connect:type=sink-task-metrics,name=read-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)transform time percentageThe average percentage of time spent transforming sink records for this task belonging to the named sink connector in this workerkafka.connect:type=sink-task-metrics,name=transform-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)put time percentageThe average percentage of time this task belonging to the named sink connector in this worker spent putting/processing sink recordskafka.connect:type=sink-task-metrics,name=put-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)flush time percentageThe average percentage of time this task belonging to the named sink connector in this worker spent flushing sink recordskafka.connect:type=sink-task-metrics,name=flush-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)pause time percentageThe average percentage of time this task belonging to the named sink connector in this worker were pausedkafka.connect:type=sink-task-metrics,name=pause-time-percentage,worker=([-.\w]+),connector=([-.\w]+),task=([\d]+)

...

 

...

1.0.0
offset-commit-seq-noThe current sequence number for offset commits1.0.0
offset-commit-completion-rateThe average per-second number of offset commit completions that were completed successfully1.0.0
offset-commit-completion-totalThe total number of offset commit completions that were completed successfully1.0.0
offset-commit-skip-rateThe average per-second number of offset commit completions that were received too late and skipped/ignored1.0.0
offset-commit-skip-totalThe total number of offset commit completions that were received too late and skipped/ignored1.0.0
put-batch-max-time-msThe maximum time in milliseconds taken by this task to put a batch of sinks records1.0.0
put-batch-avg-time-msThe average time in milliseconds taken by this task to put a batch of sinks records1.0.0
put-batch-99p-time-msThe 99th percentile time in milliseconds spent by this task to put a batch of sinks records
put-batch-95p-time-msThe 95th percentile time in milliseconds spent by this task to put a batch of sinks records
put-batch-90p-time-msThe 90th percentile time in milliseconds spent by this task to put a batch of sinks records
put-batch-75p-time-msThe 75th percentile time in milliseconds spent by this task to put a batch of sinks records
put-batch-50p-time-msThe 50th percentile (average) time in milliseconds spent by this task to put a batch of sinks records
flush-max-time-msThe maximum time in milliseconds taken by this sink task to pre-commit/flush
flush-99p-time-msThe 99th percentile time in milliseconds spent by this sink task to pre-commit/flush
flush-95p-time-msThe 95th percentile time in milliseconds spent by this sink task to pre-commit/flush
flush-90p-time-msThe 90th percentile time in milliseconds spent by this sink task to pre-commit/flush
flush-75p-time-msThe 75th percentile time in milliseconds spent by this sink task to pre-commit/flush
flush-50p-time-msThe 50th percentile (average) time in milliseconds spent by this sink task to pre-commit/flush


MBean namekafka.connect:type=sink-

...

task-metrics,

...

connector=

...

([-.\w]+),

...

task=([

...

\

...

d]+)

...

,topic=([-.\w]+),

...

partition=([

...

\

...

d]+)

...

Metric/Attribute Name

Description

Implemented
sink-record-lagThe latest lag in terms of number of records behind the consumer the offset commits are for the topic partition.
sink-record-lag-avgThe average lag in terms of number of records behind the consumer the offset commits are for the topic partition.
sink-record-lag-maxThe maximum lag in terms of number of records behind the consumer the offset commits are for the topic partition.

Worker Metrics

MBean namekafka.connect:type=

...

connect-

...

worker-metrics

...

Metric/Attribute Name

Description

Implemented

task-count

The number of tasks run in this worker1.0.0
connector-countThe number of connectors run in this worker1.0.0
connector-startup-attempts-totalThe total number of connector startups that this worker has attempted.1.0.0
connector-startup-success-totalThe total number of connector starts that succeeded.1.0.0
connector-startup-success-percentageThe average percentage of this worker's connectors starts that succeeded.1.0.0
connector-startup-failure-totalThe total number of connector starts that failed.1.0.0
connector-startup-failure-percentageThe average percentage of this worker's connectors starts that failed.1.0.0
task-startup-attempts-totalThe total number of task startups that this worker has attempted.1.0.0
task-startup-success-totalThe total number of task starts that succeeded.1.0.0
task-startup-success-percentageThe average percentage of this worker's task starts that succeeded.1.0.0
task-startup-failure-totalThe total number of task starts that failed.1.0.0
task-startup-failure-percentageThe average percentage of this worker's task starts that failed.1.0.0
rest-request-rateThe average per second number of requests handled by the REST endpoints in this worker
rest-request-totalThe total number of requests handled by the REST endpoints in this worker


Worker Rebalance Metrics

...

 

...

assigned tasks

...

MBean name

kafka.connect:type=connect-worker-rebalance-metrics

...

Metric/Attribute Name

Description

Implemented
leader-nameThe name of the group leader1.0.0
epochThe epoch or generation number of this worker1.0.0
completed-rebalances-totalThe total number of rebalances completed by this worker.1.0.0
rebalancingWhether this worker is currently rebalancing.1.0.0
rebalance-max-time-msThe maximum time in milliseconds spent by this worker to rebalance.1.0.0
rebalance-avg-time-msThe average time in milliseconds spent by this worker to rebalance.1.0.0
rebalance-99p-time-msThe 99th percentile time in milliseconds spent by this worker to rebalance during the last window (defaults to an hour)
rebalance-95p-time-msThe 95th percentile time in milliseconds spent by this worker to rebalance during the last window (defaults to an hour)
rebalance-90p-time-msThe 90th percentile time in milliseconds spent by this worker to rebalance during the last window (defaults to an hour)
rebalance-75p-time-msThe 75th percentile time in milliseconds spent by this worker to rebalance during the last window (defaults to an hour)
rebalance-50p-time-msThe 50th percentile (average) time in milliseconds spent by this worker to rebalance during the last window (defaults to an hour)
time-since-last-rebalance-msThe time in milliseconds since the most recent rebalance in this worker1.0.0

Configuration

The distributed and standalone worker configuration files will support the following properties. These exactly match the producer and consumer configurations of the same name. (The first three are already in the distributed worker configuration.)

Configuration FieldTypeDefaultImportanceDescription
metrics.sample.window.mslong30000lowThe window of time in milliseconds a metrics sample is computed over. Must be a non-negative number.
metrics.num.samplesint2lowThe number of samples maintained to compute metrics. Must be a positive number.
metric.reportersstring""lowA list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.
metrics.recording.levelstring"INFO"lowThe highest recording level for metrics. Must be either "INFO" or "DEBUG".


Proposed Changes

We will add the relevant metrics and worker configuration properties as specified in the Public Interfaces section

...

task count

...

Metric NameDescriptionMBean attribute
rebalance success totalThe total number of this worker's successful rebalanceskafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-success-total,worker=([-.\w]+)
rebalance success percentageThe average percentage of this worker's rebalances that succeededkafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-success-percentage,worker=([-.\w]+)
rebalance failure totalThe total number of this worker's failed rebalanceskafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-failure-total,worker=([-.\w]+)
rebalance failure percentageThe average percentage of this worker's rebalances that failedkafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-failure-percentage,worker=([-.\w]+)
rebalance maximum timeThe maximum time spent by this worker to rebalancekafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-max-time,worker=([-.\w]+)
rebalance 99th percentile timeThe 99th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-99p-time,worker=([-.\w]+)
rebalance 95th percentile timeThe 95th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-95p-time,worker=([-.\w]+)
rebalance 90th percentile timeThe 90th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-90p-time,worker=([-.\w]+)
rebalance 75th percentile timeThe 75th percentile time spent by this worker to rebalance during the last window (defaults to an hour)kafka.connect:type=connect-worker-rebalance-metrics,name=rebalance-75p-time,worker=([-.\w]+)
time since last rebalanceThe time since the most recent rebalance in this workerkafka.connect:type=connect-worker-rebalance-metrics,name=time-since-last-rebalance,worker=([-.\w]+)
task failure rateThe number of tasks that failed in this workerkafka.connect:type=connect-worker-rebalance-metrics,name=task-failure-rate,worker=([-.\w]+

...

Metric NameDescriptionMBean attribute
REST request rateThe number of requests handled by the REST endpoints in this workerkafka.connect:type=worker-rest-metrics,name=request-rate,worker=([-.\w]+)

 

Proposed Changes

We will add the relevant metrics as specified in the Public Interfaces section. The scope of each metric is limited by a single worker, and these metrics can be aggregated across workers; this is why the worker name is included in all MBean attributes.

Compatibility, Deprecation, and Migration Plan

Two existing metrics exist but Existing Connect coordinator metrics will not be changed. However, these new metrics use slightly different patterns for the MBean attributes to always include the worker name, new metrics will be added to replicate these existing metrics with the new naming pattern.

The metrics.sample.window.msmetrics.num.samples, and metric.reporters configurations already exist in the distrtibuted worker; these will also be added to the standalone worker. The metrics.recording.level configuration will be added to both the distributed and standalone worker configurations. All four of these metrics have sensible default values and therefore users do not need to add or override them in their existing configuration files.

Rejected Alternatives

None

...