Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current stateUnder discussion - WIP; not yet announced

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Being able to centrally, proactively, and reactively, monitor and troubleshoot problems with Kafka clients is becoming increasingly important as the use of Kafka is expanding within organizations as well as for hosted Kafka services. The typical Kafka client user is now an application owner with little experience in operating Kafka clients, while the cluster operator has profound Kafka knowledge but little insight in the client application.

...

This proposal aims to provide a generic and standardised interface through which a cluster operator or an external system may request and a client can push client-side metrics to the broker, a plugin interface for how those metrics are handled by the broker, and a minimum set of standard pre-defined metrics that a Kafka client should expose through this interface. A supporting client should have metrics enabled by default and not require any extra configuration, and will provide at least the standardised metrics as outlined later in this proposal.

Public Interfaces

This KIP introduces the following new public interfaces:

  • New PushTelemetryRequest protocol request type.
  • Updated metrics receiver interface in the broker to accustom for the new semantics and format (TBD).
  • A new CLIENT_METRICS ConfigEntry resource type for configuring metrics subscriptions.
  • kafka-client-metrics.sh CLI script to configure and list client metrics subscriptions.

Protocol

Code Block
PushTelemetryRequest {
	ClientInstanceId string 			// client.id + “-” + UUID4 unique for this client instance.
										// Must be set to Null on the first request, and to the
										// returned ClientInstanceId from the first response
										// for all subsequent requests to any broker.
	Terminating bool					// Client is terminating.
	ContentType string					// “application/x-protobuf” or “application/x-protobuf+zstd”
	Metrics binary						// Format specified by ContentType, possibly compressed.
}

PushTelemetryResponse {
	ThrottleTime int32					// Standard and metric-specific throttling
	ErrorCode int16						// Error code
	ClientInstanceId string				// Will be set to a generated unique id if the
										// request ClientInstanceId was Null, else
										// this field will be set to Null.
	AcceptedContentTypes Array[string]	// Accepted metric formats.
                                       	// Only returned if Request.ContentType is null
								      	// or empty, or ErrorCode is set to
								      	// UnsupportedFormat(?).
										// Also includes the supported compression types.
	NextPushMs int32					// Delay until next PushTelemetry
	RequestedMetrics Array[string]		// Requested Metrics prefix string match.
										// Empty array: No metrics subscribed.
										// Array[0] empty string: All metrics subscribed.
										// Array[..]: prefix string match
}


Broker metrics receiver

TBD / Xavier Léauté

Client API

Retrieve broker-generated client instance id:

...

If the client has not yet requested a client instance id from the broker this call will block up to the timeout, and trigger an immediate request which is otherwise delayed at client startup (see Client behaviour chapter). If no client instance id can be retrieved within the timeout an error is returned/raised (timeout, feature not supported by broker, auth failure, etc).


Client configuration

enable.metrics.push=true|false (default: true) -Allows disabling the functionality outlined in this proposal which should be enabled by default.

enable.private.metrics.push=true|false (default: false)Enable metrics and labels that are considered private, e.g., hostname and host operating system.


New error codes

TBD

Proposed Changes

Overview

This feature is made up of the following components:

  • PushTelemetryRequest|Response - protocol request used by the client to send metrics to any broker it is connected to. The response is used to control push frequency, what metrics the client should push, etc.
  • Standardised metrics - a set of standardised metrics that all supporting clients should provide.
  • AdminAPI config - the AdminAPI configuration interface with a new CLIENT_METRICS resource type is used to manage metrics subscriptions.
  • Client metrics plugin / extending the MetrocsReporter interface - a broker plugin interface that performs something meaningful with the metrics. This plugin will typically forward the metrics to a time series database. It is recommended that any broker-side processing is kept to a minimum for scalability and performance reasons.


Client identification (CLIENT_INSTANCE_ID)

The added value of metrics collection is limited if the metrics can’t be tied to an entity, in this case a client instance. While aggregate metrics do provide value in trends and superficial monitoring, for metrics to be truly useful there needs to be a way to identify a particular client instance, where it runs, by who, etc.

...

As it is not feasible for a Kafka client instance to automatically generate or acquire a unique identity for the application process it runs in, and as we can’t rely on the user to configure one, we treat the application instance id as an optional future nice-to-have that may be included as a metrics label if it has been set by the user. This allows a more direct mapping of client instance to application instance and vice versa. However, due to these constraints, adding an application instance id is outside the scope of this proposal.

Mapping

Mapping the client instance id to an actual application instance running on a (virtual) machine can be done by inspecting the metrics resource labels, such as the client source address and source port, or security principal, all of which are added by the receiving broker. This will allow the operator together with the user to identify the actual application instance.


Metrics naming and format

The OpenTelemetry specification will be used as the metrics serialization format as well as to define how metrics are constructed when it comes to naming, metric type, etc.

...

client_software_name=confluent-kafka-python
client_software_version=v2.1.3
client_instance_id=someClientId-B64CD139-3975-440A-91D4
idempotence=true
transactional_id=someTxnApp

Sparse metrics

To keep metrics volume down it is recommended that a client only sends metrics with a recorded value.

Compression

As metric names and labels are highly repeated it is recommended that the serialized metrics are compressed prior to sending them to the broker if the serialized uncompressed size exceeds 1000 bytes.

...

Tests indicate that a compression ratio up to 10x is possible for the standard metrics.

Client behaviour

A client that supports this metric interface and identifies a supportive broker (through detecting at least PushTelemetryRequestV0 in the ApiVersionResponse) will start off (after a short delay) by sending an empty PushTelemetryRequest, with the ClientInstanceId field set to Null, to one randomly selected connected broker to gather its client instance id, the desired push interval, subscribed push metrics, and what metrics formats the broker supports, this is effectively a metrics handshake that is performed each time a client selects a new broker to send metrics to.

...

If PushTelemetryResponse.RequestedMetrics indicates that no metrics are desired the client must still send an empty PushTelemetryRequest by the next NextPushMs interval to get an updated set of desired or immediate metrics as well as a new NextPushMs time.

Client termination

When a client is being shut down it should send its final metrics regardless of the NextPushMs time, and only if the last PushTelemetryResponse.RequestedMetrics array length was greater than zero.

...

The metrics should contain the reason for the client termination by including the client.terminating metric with the label “reason” set to a human readable explanation why the client is being shut down, such as “Fatal error: Static consumer fenced by newer instance”, or “Consumer closed by application”.


Error handling

Actions to be taken by the client if the PushTelemetryResponse.ErrorCode is set to a non-zero value.


Error code

Reason

Action

..AuthorizationFailed

Client is not permitted to send metrics.

Raise a non-fatal error to the application and schedule the next PushTelemetryRequest to 30 minutes.

UnsupportedEncoding

Broker does not support the client’s metric encoding type.

Raise a non-fatal error to the application and schedule the next PushTelemetry to 30 minutes.

InvalidRecord

Broker failed to decode or validate the client’s encoded metrics.

Raise a non-fatal error to the application and schedule the next PushTelemetry to 5 minutes.

FunctionalityNotEnabled

Client sent a PushTelemetryRequest to a broker which does not have a client metrics receiver plugin configured.

This is due to a faulty client that does not respect the ApiVersionResponse which in this case should not include PushTelemetryRequest.

Raise a non-fatal error to the application and schedule the next PushTelemetry to 30 minutes on another broker.

RateLimited

Client sent a non-Terminate push request before the NextPushMs time had expired.

The response will include only the NextPushMs which the client will use to adjust its next push time.

UnsupportedCompressionType

Client’s compression type is not supported by the broker.

The client should switch to another compression type. When all compression types are exhausted the application should fall back to uncompressed format.

The 5 and 30 minute retries are to eventually trigger a retry and avoid having to restart clients if the cluster metrics configuration is disabled temporarily, e.g., by operator error.

How non-fatal errors are propagated to the application is client and language specific, simply logging the error is sufficient.

Opting out

Client metrics should be enabled by default in the client but a user may disable the metrics functionality by setting the enable.metrics.push property to false.

...

Also see the Privacy chapter below.



Receiving broker behaviour

We allow the client to send PushTelemetryRequest to any connected broker that reports PushTelemetryRequests in its ApiVersionResponse.

...

If there is no client metrics receiver plugin configured on the broker it must not include the PushTelemetryRequest in ApiVersionResponse and any unsolicited PushTelemetryRequest should be returned immediately with PushTelemetryResponse.ErrorCode set to FunctionalityNotEnabled.


PushTelemetryRequest handling

Validation

Validation of the encoded metrics is the task of the ClientMetricsReceiver, if the encoding type is unsupported the response will be returned with ErrorCode set to UnsupportedEncoding. Should decoding or validation of the binary metrics blob fail the ErrorCode will be set to InvalidRecord.

Rate-limiting and NextPushMs

The receiving broker’s standard quota-based throttling should operate as usual for PushTelemetryRequest, but in addition to that the PushTelemetryRequest is also subject to rate-limiting based on the calculated next desired NextPushMs interval derived from the configured metrics subscriptions. Should the client send a push request prior to expiry of the previously calculated NextPushMs the broker will discard the metrics and return a PushTelemetryResponse with ErrorCode set to RateLimited and NextPushMs set to the remaining time, all other fields should be left blank.

...

If the receiving broker has no prior metrics state but at least one matching metrics subscription for a client instance id the first returned NextPushMs is a randomised value between 0 and the lowest metrics interval of the subscription. This attempts to distribute the client metrics pushes in case a large number of clients are started simultaneously.

Metrics subscription

Metrics subscriptions are configured through the standard Kafka Admin API configuration interface with the new resource-type CLIENT_METRICS, the resource-name is a prefix-match of the client instance id (which as stated previously is a concatenation of the client.id and a unique uuid string). The configuration entry name is a prefix-matched metrics name (e.g., “client.producer.partition.”) while the configuration value is the desired collection/push interval in milliseconds.

...

This client instance specific state is maintained in memory up to MAX(60*1000, NextPushMs * 3) milliseconds and is used to enforce the push interval rate-limiting. There is no persistence of client instance metrics state across broker restarts or between brokers.



Client metrics and metric labels

Clients that support this KIP must provide the following metrics, as long as they’re relevant for the client implementation. These metrics do not need to be comparable between client implementations, e.g., io.wait.time may have somewhat different semantics between Java client and librdkafka.

...

A client implementation must not add additional metrics or labels under the “org.apache.kafka/” prefix without a corresponding accepted KIP.

Private metrics

The default metrics collection on the client must take extra care not to expose any information about the application and system it runs on as they may identify internal projects, infrastructure, etc, that the user/customer may not want to expose to the Kafka infrastructure owner. This includes information such as hostname, operating system, credentials, runtime environment, etc.

...

The metrics and labels covered by this constraint are indicated in-place in the following tables.


Metric types

The metric types in the following tables correspond to the OpenTelemetry v1 metrics protobuf message types. A short summary:

  • IntSum, DoubleSum  - Monotonic total count meter (Counter). Suitable for total number of X counters, e.g., total number of bytes sent.
  • IntGauge, DoubleGauge  - Non-monotonic current value meter (UpDownCounter). Suitable for current value of Y, e.g., current queue count.
  • IntHistogram, DoubleHistogram  - Value distribution meter (ValueRecorder). Suitable for latency values, etc.


Client instance-level metrics

Metric name

Type

Labels

Description

client.connection.creations

IntSum

FIXME: with broker_id label?

Total number of broker connections made.

client.connection.count

IntGauge


Current number of broker connections.

client.connection.errors

IntSum

reason

Total number of broker connection failures. Label ‘reason’ indicates the reason: 

disconnect - remote peer closed the connection.

auth - authentication failure.

TLS - TLS failure.

timeout - client request timeout.

close - client closed the connection.

client.request.rtt

IntGauge

broker_id

Average request latency / round-trip-time to broker and back

client.request.queue.latency

IntGauge

broker_id

Average request queue latency waiting for request to be sent to broker.

client.request.queue.count

IntGauge

broker_id

Number of requests in queue waiting to be sent to broker.

client.request.success

IntSum

broker_id

Number of successful requests to broker, that is where a response is received without no request-level error (but there may be per-sub-resource errors, e.g., errors for certain partitions within an OffsetCommitResponse).

client.request.errors

IntSum

broker_id

reason

Number of failed requests.

Label ‘reason’ indicates the reason:

timeout - client timed out the request,

disconnect - broker connection was closed before response could be received,

error - request-level protocol error.

client.io.wait.time

IntGauge


Amount of time waiting for socket I/O. FIXME: histogram? Avg? Total?

Should this be for POLLOUT only?


As the client will not know the broker id of its bootstrap servers the broker_id label should be set to “bootstrap”. FIXME: Should we have a broker_address (“host:port”) for this purpose?


Client instance-level Producer metrics

Metric name

Type

Labels

Description

client.producer.record.bytes

IntGauge


Total number of record memory currently in use by producer. This includes the record fields (key, value, etc) as well as any implementation specific overhead (objects, etc).

client.producer.record.count

IntGauge


Total number of records currently handled by producer.

client.producer.queue.max.bytes

IntGauge


Total amount of queue/buffer memory allowed on the producer queue(s).

client.producer.queue.bytes

IntGauge


Current amount of memory used in producer queues.

client.producer.queue.max.messages

IntGauge


Maximum amount of messages allowed on the producer queue(s).

client.producer.queue.messages

IntGauge


Current number of messages on the producer queue(s).


Client instance-level Consumer metrics

Metric name

Type

Labels

Description

client.consumer.poll.interval

DoubleGauge FIXME


The interval at which the application calls poll(), in seconds.

client.consumer.poll.last

DoubleGauge


The number of seconds since the last poll() invocation.

client.consumer.poll.latency

DoubleGauge


The time it takes poll() to return a new message to the application

client.consumer.commit.count

IntSum


Number of commit requests sent.

client.consumer.group.assignment.partition.count

IntGauge


Number of currently assigned partitions to this consumer by the group leader.

client.consumer.assignment.partition.count

IntGauge


Number of currently assigned partitions to this consumer, either through the group protocol or through assign().

client.consumer.group.rebalance.count

IntSum


Number of group rebalances.

client.consumer.record.queue.count

IntGauge


Number of records in consumer pre-fetch queue.

client.consumer.record.queue.bytes

IntGauge


Amount of record memory in consumer pre-fetch queue. This may also include per-record overhead.

client.consumer.record.application.count

IntSum


Number of records consumed by application.

client.consumer.record.application.bytes

IntSum


Memory of records consumed by application.

client.consumer.fetch.latency

IntGauge


FetchRequest latency.

client.consumer.fetch.count

IntCount


Total number of FetchRequests sent.

client.consumer.fetch.failures

IntCount


Total number of FetchRequest failures.

Client topic-level Producer metrics

Metric name

Type

Labels

Description

client.producer.partition.queue.bytes

IntGauge

topic

partition

acks=all|none|leader

Number of bytes queued on partition queue.

client.producer.partition.queue.count

IntGauge

topic

partition

acks=all|none|leader

Number of records queued on partition queue.

client.producer.partition.latency

DoubleGauge

topic

partition

acks=all|none|leader

Total produce record latency, from application calling send()/produce() to ack received from broker.

client.producer.partition.queue.latency

DoubleGauge

topic

partition

acks=all|none|leader

Time between send()/produce() and record being sent to broker.

client.producer.partition.record.retries

IntSum

topic

partition

acks=all|none|leader

Number of ProduceRequest retries.

client.producer.partition.record.failures

IntSum

topic

partition

acks=all|none|leader

reason

Number of records that permanently failed delivery. Reason is a short string representation of the reason, which is typically the name of a Kafka protocol error code, e.g., “RequestTimedOut”.

client.producer.partition.record.success

IntSum

topic

partition

acks=all|none|leader

Number of records that have been successfully produced.


The “partition” label should be “unassigned” for not yet partitioned messages, as they are not yet assigned to a partition queue.


Host metrics

These metrics are optional to implement.

Metric name

Type

Labels

Description

client.process.memory.bytes

IntGauge


Current process/runtime memory usage (RSS, not virtual).

client.process.cpu.user.time

DoubleSum


User CPU time used (seconds).

client.process.cpu.system.time

DoubleSum


System CPU time used (seconds).

client.process.pid

IntGauge


The process id. Can be used, in conjunction with client.host.name, to map multiple client instances to the same process.

Only emitted if private metrics are enabled.

Standard client resource labels

Label name

Description

client_software_name

The client’s implementation name.

client_software_version

The client’s version

client_instance_id

The generated CLIENT_INSTANCE_ID.

client_id

client.id

client_rack

client.rack (if configured)

group_id

group.id (consumer)

group_instance_id

group.instance.id (consumer)

group_member_id

Group member id (if any, consumer)

transactional_id

transactional.id (producer)

hostname

Hostname of the client machine.

Only emitted if private metrics are enabled.

os

Operating system name, version, architecture, distro, etc.

Only emitted if private metrics are enabled.

runtime

Runtime environment, e.g., the JVM, .NET runtime, Python version, etc.

Only emitted if private metrics are enabled.

Broker-added labels

The following labels are added by the broker as metrics are received

Label name

Description

client_source_address

The client connection’s source address.

client_source_port

The client connection’s source port.

principal

Client’s security principal. Content depends on authentication method.

broker_id

Receiving broker’s node-id.

Security

Since client metric subscriptions are primarily aimed at the infrastructure operator that is managing the Kafka cluster it should be sufficient to limit the config control operations to the CLUSTER resource. 

There will be no permission checks on the PushTelemetryRequest itself.

API Request

Resource

ACL

DescribeConfigs

CLUSTER

DESCRIBE,READ

AlterConfigs

CLUSTER

WRITE

PushTelemetryRequest

N/A

N/A

Tools

A wrapper tool around kafka-configs.sh that makes it easier to set up metric subscriptions.

...

Code Block
bash
bash
bin/kafka-client-metrics.sh [arguments] --bootstrap-server <brokers>

List configured metrics:
	--list
	[--id <client-instance-id|prefix-match>]

Add metrics:
	--add
	 --id <client-instance-id|prefix-match>
	 --metric <metric-prefix>..
	 --interval-ms <interval>

Delete metrics:
	--delete
	 --id <client-instance-id|prefix-match>
	[--metric <metric-prefix>]..

Example:

# Subscribe to producer partition queue and memory usage
# metrics every 60s from all librdkafka clients.

$ kafka-client-metrics.sh --bootstrap-server localhost:9092 \
	--add \
	--id rdkafka \
	--metric org.apache.kafka/client.producer.partition. \
	--metric librdkafka/client.memory. \
	--interval 60000

# The metrics themselves are not viewable with this CLI tool
# since the storage of metrics is plugin-dependent.



Example workflows

Example workflows showcasing how this feature may be used for proactive and reactive purposes.

Proactive monitoring

The Kafka cluster is configured to collect all standard metrics pushed by the client at an interval of 60 seconds, the metrics plugin forwards the collected metrics to an imaginary system that monitors a set of well known metrics and triggers alarms when trends go out of profile.

...

This collection of information, along with the triggered metric, is sent to the incident management system for further investigation or aggregation, and provides enough information to identify who and where the client is run. Further action might be to contact the organization or team that matches the principal, transactional.id, source address, etc, for further investigation.

Reactive monitoring

The Kafka cluster configuration for metrics collection is irrelevant to this use-case, given that the proper metrics plugin is enabled. The metrics plugin is configured to write metrics to a topic. A support system with an interactive interface is reading from this metrics topic, and has an Admin client to configure the cluster with desired metrics subscriptions.

...

The operator removes the metrics subscription for the client.id, which causes the next PushTelemetryResponse to return an empty subscription causing the client to stop pushing the previously subscribed metrics.

Future work

Tracing and logging are outside the scope of this proposal but the use of OpenTelemetry allows for those additional two methods within the same semantics and nomenclature outlined by this proposal.


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

Dedicated Metrics coordinator based on client instance id

An earlier version of this proposal had the notion of metrics coordinator which was selected based on the client instance id. This metrics coordinator would be responsible for rate-limiting pushed metrics per client instance. To avoid each client to have a connection to its metric coordinator the proposal suggested using the impersonation/request-forwarding functionality suggested in a contemporary design by Jason G.

Since this made the broker-side implementation more complex and did not provide fool-proof control of misbehaving clients (a rogue client could simply generate new instance ids on each request) it was decided that the value of this functionality was not in par with the implementation cost and thus removed from the proposal.

JMX

Scales poorly, Java-specific, commonly mistaken for a trick bike.

Combine with Dynamic client configuration

The Push interface is well suited for propagating client configuration to clients, but to keep scope down we leave that out of this proposal. It should be straightforward to extend and repurpose the interface described in this proposal to also support dynamic client configuration, log/event retrieval, etc, at a later time.

Use native Kafka protocol framing for metrics

The Kafka protocol has flexible fields that would allow us to construct the metrics serialization with pure Kafka protocol fields, but this would duplicate the efforts of the OpenTelemetry specification where Kafka would have to maintain its own specification. On the broker side the native Kafka protocol metrics would need to be converted to OpenTelemetry (or other) format anyway.

...