Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

While the broker already tracks request-level metrics for connected clients, there is a gap in the end-to-end monitoring when it comes to visibility of client internals, be it queue sizes, internal latencies, error counts, application behaviour (such as message processing rate), etc. These are Kafka client metrics, and not application metrics.

This proposal aims to provide a generic and standardised interface through which a cluster operator or an external system may request and a client can push client-side metrics to the broker, a plugin interface for how those metrics are handled by the broker, and a minimum set of standard pre-defined metrics that a Kafka client should expose through this interface. A supporting client should have metrics enabled by default and not require any extra configuration, and will provide at least the standardised metrics as outlined later in this proposal.

...

Code Block
PushTelemetryRequest {
	ClientInstanceId string 			// client.id + “-” + UUID4 unique for this client instance.
										// Must be set to Null on the first request, and to the
										// returned ClientInstanceId from the first response
										// for all subsequent requests to any broker.
	Terminating bool					// Client is terminating.
	ContentType string					// “application/x-protobuf;type=otlp+metrics09” or “application/x-protobuf+zstd;type=otlp+metrics09”
	Metrics binary						// Format specified by ContentType, possibly compressed.
}

PushTelemetryResponse {
	ThrottleTime int32					// Standard and metric-specific throttling
	ErrorCode int16						// Error code
	ClientInstanceId string				// Will be set to a generated unique id if the
										// request ClientInstanceId was Null, else
										// this field will be set to Null.
	AcceptedContentTypes Array[string]	// Accepted metric formats.
                                       	// Only returned if Request.ContentType is null
								      	// or empty, or ErrorCode is set to
								      	// UnsupportedFormat(?).
										// Also includes the supported compression types.
										// Ordered by preference (most suitable type first).
	NextPushMs int32					// Delay until next PushTelemetry
	RequestedMetrics Array[string]		// Requested Metrics prefix string match.
										// Empty array: No metrics subscribed.
										// Array[0] empty string: All metrics subscribed.
										// Array[..]: prefix string match
}

...

Error code

Reason

Action

..AuthorizationFailed

Client is not permitted to send metrics.

Raise a non-fatal error to the application and schedule the next PushTelemetryRequest to 30 minutes.

UnsupportedEncoding

Broker does not support the client’s metric encoding type.

Raise a non-fatal error to the application and schedule the next PushTelemetry to 30 minutes.

InvalidRecord

Broker failed to decode or validate the client’s encoded metrics.

Raise a non-fatal error to the application and schedule the next PushTelemetry to 5 minutes.

FunctionalityNotEnabled

Client sent a PushTelemetryRequest to a broker which does not have a client metrics receiver plugin configured.

This is due to a faulty client that does not respect the ApiVersionResponse which in this case should not include PushTelemetryRequest.

Raise a non-fatal error to the application and schedule the next PushTelemetry to 30 minutes on another broker.

RateLimited

Client sent a non-Terminate push request before the NextPushMs time had expired.

The response will include only the NextPushMs which the client will use to adjust its next push time.

UnsupportedCompressionType

Client’s compression type is not supported by the broker.

The client should switch to another compression type. When all compression types are exhausted the application should fall back to uncompressed format.

...

If there is no client metrics receiver plugin configured on the broker it must not include the PushTelemetryRequest in ApiVersionResponse and any unsolicited PushTelemetryRequest should be returned immediately with will respond to PushTelemetryRequests with PushTelemetryResponse.ErrorCode set to FunctionalityNotEnabled.

PushTelemetryRequest handling

The client should not attempt a PushTelemetryRequest until at least 30 minutes has passed, which allows the metrics receiver to be enabled or disabled without having to restart the broker or reset the client connection.

PushTelemetryRequest handling

ValidationValidation

Validation of the encoded metrics is the task of the ClientMetricsReceiver, if the encoding type is unsupported the response will be returned with ErrorCode set to UnsupportedEncoding. Should decoding or validation of the binary metrics blob fail the ErrorCode will be set to InvalidRecord.

...

If the receiving broker has no prior metrics state but at least one matching metrics subscription for a client instance id the first returned NextPushMs is a randomised value between 0 and the lowest metrics interval of the subscription. This attempts to distribute the client metrics pushes in case a large number of clients are started simultaneously.

Metrics subscription

Metrics subscriptions are configured through the standard Kafka Admin API configuration interface with the new resource-type CLIENT_METRICS, the resource-name is a prefix-match of the client instance id (which as stated previously is a concatenation of the client.id and a unique uuid string). The configuration entry name is a prefix-matched metrics name (e.g., “client.producer.partition.”) while the configuration value is the desired collection/push interval in milliseconds.

Example:

Request and response size estimations / FIXME / WIP

A producer's typical static (e.g., standard collecting, non-troubleshooting) subscription set may look something like:

  • org.apache.kafka/client.producer.   // prefix match
  • org.apache.kafka/client.connection.errors
  • org.apache.kafka/client.request.errors

With a recommended collection interval of 60 seconds.

This gives a PushTelemetryResponse size of roughly respHdr(8) + respFields(20) + clientId(~60) + subscriptionStrings(36 + 43 + 41) = 208 bytes, every 60 seconds, per connected producer client per cluster.

Depending on the amount of metrics the producer implementation provides for the given prefixes, the PushTelemetryRequest with the above metrics would yield FIXME(count it), compressed down to FIXME, every 60 seconds, per connected producer client per cluster.  FIXME: For now, 1000 bytes. Compression ratio 8x.

10000 producers with these metrics subscribed, in a 12 broker cluster yields the following numbers:

  • 14 PushTelemetryRequests/second per broker
  • 15 kb/s requests / broker
  • 3 kb/s responses / broker
  • 112 kb/s uncompressed OTLP metrics provided to metrics plugin per broker. 1.3MB/s per cluster.


Metrics subscription

Metrics subscriptions are configured through the standard Kafka Admin API configuration interface with the new resource-type CLIENT_METRICS, the resource-name is a prefix-match of the client instance id (which as stated previously is a concatenation of the client.id and a unique uuid string). The configuration entry name is a prefix-matched metrics name (e.g., “client.producer.partition.”) while the configuration value is the desired collection/push interval in milliseconds.


Example:

Code Blockcode
bash
bash
 $ kafka-configs.sh --bootstrap-server $BROKERS \
	--entity-type client_metrics \
	--entity-name ‘rdkafka’ \
	--alter \
	--add-config "org.apache.kafka/client.producer.partition.queue.=15000"

...


As the client will not know the broker id of its bootstrap servers the broker_id label should be set to “bootstrap”. FIXME: Should we have a broker_address (“host:port”) for this purpose?to to the negative index of the broker in the client's bootstrap.servers list, starting at -1.


Client instance-level Producer metrics

...

API Request

Resource

ACL

DescribeConfigs

CLUSTER

DESCRIBE,READ

AlterConfigs

CLUSTER

WRITE

PushTelemetryRequest

N/A

N/A


See the Private metrics chapter above for privacy/integrity.

Tools

A wrapper tool around kafka-configs.sh that makes it easier to set up metric subscriptions.

the Private metrics chapter above for privacy/integrity.


Tools

A wrapper tool around kafka-configs.sh that makes it easier to set up metric subscriptions.



Code Block
bash
bash
bin/kafka-client-metrics.sh [arguments] --bootstrap-server <brokers>

List configured metrics:
	--list
	[--id <client-instance-id> | --id-prefix <prefix-match>]

Add metrics:
	--add
	 --id <client-instance-id> | --id-prefix <prefix-match>
	 --metric <metric-prefix>..
	 --interval-ms <interval>

Delete
Code Block
bashbash
bin/kafka-client-metrics.sh [arguments] --bootstrap-server <brokers>

List configured metrics:
	--listdelete
	[ --id <client-instance-id|prefix-match>]

Add metrics:id> | --id-prefix <prefix-match>
	[--metric <metric-prefix>]..

Example:

# Subscribe to producer partition queue and memory usage
# metrics every 60s from all librdkafka clients.

$ kafka-client-metrics.sh --bootstrap-server localhost:9092 \
	--add \
	 --id <client-instance-id|prefix-match>
	 -prefix rdkafka \
	--metric org.apache.kafka/client.producer.partition. \
	--metric <metric-prefix>librdkafka.client.memory. \
	 --interval-ms <interval>

Delete metrics:60000

# For a specific client instance:
$ kafka-client-metrics.sh --bootstrap-server localhost:9092 \
	--delete \
	 --id <client-instance-id|prefix-match>
	[rdkafka-33E53F93-C6D0-4136-A4A7-6308050AFFF3 \
	--metric <metric-prefix>]..

Example:
org.apache.kafka/client.request.errors


# Subscribe to producer partition queue and memory usage
#all metrics everyby 60sproviding froman allempty librdkafka clientsmetric.

$ kafka-client-metrics.sh --bootstrap-server localhost:9092 \
	--add \
	--id rdkafka-33E53F93-C6D0-4136-A4A7-6308050AFFF3 \
	--metric org.apache.kafka.client.producer.partition.'' \
	--metric librdkafka.client.memory. \
	    --interval 60000120000


# The metrics themselves are not viewable with this CLI tool
# since the storage of metrics is plugin-dependent.

...