Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • New PushTelemetryRequest protocol request type.
  • Updated metrics receiver interface in the broker to accustom for the new semantics and format (TBD).
  • A new CLIENT_METRICS ConfigEntry resource type for configuring metrics subscriptions.
  • kafka-client-metrics.sh CLI script to configure and list client metrics subscriptions.

...

Code Block
PushTelemetryRequest {
	ClientInstanceId string 			// client.id + “-” + UUID4 unique for this client instance.
										// Must be set to Null on the first request, and to the
										// returned ClientInstanceId from the first response
										// for all subsequent requests to any broker.
	Terminating bool					// Client is terminating.
	ContentType string					// “application/x-protobuf;type=oltp07”otlp+metrics07” or “application/x-protobuf+zstd;oltp07”type=otlp+metrics07”
	Metrics binary						// Format specified by ContentType, possibly compressed.
}

PushTelemetryResponse {
	ThrottleTime int32					// Standard and metric-specific throttling
	ErrorCode int16						// Error code
	ClientInstanceId string				// Will be set to a generated unique id if the
										// request ClientInstanceId was Null, else
										// this field will be set to Null.
	AcceptedContentTypes Array[string]	// Accepted metric formats.
                                       	// Only returned if Request.ContentType is null
								      	// or empty, or ErrorCode is set to
								      	// UnsupportedFormat(?).
										// Also includes the supported compression types.
	NextPushMs int32					// Delay until next PushTelemetry
	RequestedMetrics Array[string]		// Requested Metrics prefix string match.
										// Empty array: No metrics subscribed.
										// Array[0] empty string: All metrics subscribed.
										// Array[..]: prefix string match
}

...

The ContentType in the client's PushTelemetryRequest must include a type parameter describing the data format, in this case oltp07 otlp+metrics07 for OpenTelemetry protocol v0.7. This is needed since OLTP OTLP is not yet at v1 and may thus have backwards-incompatible changes.

Broker metrics receiver

...

may thus have backwards-incompatible changes.

Broker metrics receiver

The broker will expose a plugin interface for client telemetry to be forwarded to or collected by external systems. In particular, since we already collect data in OpenTelemetry format, on goal is to make it possible for a plugin to forward metrics directly to an OpenTelemetry collector.

The existing MetricsReporter plugin interface was built for plugins to pull metrics from the broker, but does not lend itself to the model where the broker pushes metric samples to plugins.

In order to minimize the complexity of additional configuration mechanisms, we are proposing to reuse the existing metrics reporter configuration mechanisms, but allow plugins to implement a new interface (trait) to indicate if they support receiving client telemetry.

This also allows a plugin to reuse the existing labels passed through the MetricsContext (e.g. broker, cluster id, and configured labels) and add them to the OpenTelemetry resource labels as needed.


Code Block
/**
 * A {@link MetricsReporter} may implement this interface to indicate support for collecting client telemetry on the server side
 */
@InterfaceStability.Evolving
public interface ClientTelemetry {

    /**
     * Called by the broker to create a ClientTelemetryReceiver instance.
     * This instance may be cached by the broker.
     *
     * This method will always be called after the initial call to
     * {@link MetricsReporter#contextChange(MetricsContext)}
     * on the MetricsReporter implementing this interface.
     *
     * @return
     */
    ClientTelemetryReceiver clientReceiver();
}

@InterfaceStability.Evolving
public interface ClientTelemetryReceiver {
    /**
     * Called by the broker when a client telemetry reports telemetry. The associated request context can be used
     * by the plugin to retrieve additional client information such as client ids or endpoints.
     *
     * This method may be called from the request handling thread, and as such should avoid blocking
     *
     * @param context the client request context for the corresponding PushTelemetryRequest api call
     * @param payload the encoded telemetry payload as sent by the client
     */
    void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload);

    /**
     * Returns the list of accepted content types for client metric payloads supported by this receiver
     * This method may be used by the broker to inform connecting clients of supported payload formats
     *
     * <p>e.g.
     * <code>application/x-protobuf;type=otlp+metrics07</code>,
     * </p>
     *
     * @return collection of supported mime types
     */
    Collection<String> acceptedContentTypes();
}

@InterfaceStability.Evolving
public interface ClientTelemetryPayload {

    String clientInstanceId();

    String contentType();

    ByteBuffer data();
}


Client API

Retrieve broker-generated client instance id:

...

  • PushTelemetryRequest|Response - protocol request used by the client to send metrics to any broker it is connected to. The response is used to control push frequency, what metrics the client should push, etc.
  • Standardised metrics - a set of standardised metrics that all supporting clients should provide.
  • AdminAPI config - the AdminAPI configuration interface with a new CLIENT_METRICS resource type is used to manage metrics subscriptions.
  • Client metrics plugin / extending the MetrocsReporter MetricsReporter interface - a broker plugin interface that performs something meaningful with the metrics. This plugin will typically forward the metrics to a time series database. It is recommended that any broker-side processing is kept to a minimum for scalability and performance reasons.

...

The OpenTelemetry specification will be used as the metrics serialization format as well as to define how metrics are constructed when it comes to naming, metric type, etc.

See the OpenTelemetry metrics specification for more information.

Encoding

While the protocol-level request will allow for other metrics formats, Kafka will only ship with OpenTelemetry and it is required for a client that claims support of this KIP to use OpenTelemetry by default as well.

See the OpenTelemetry metrics specification for more information.

Metric payloads are encoded as OpenTelemetry ExportMetricsServiceRequest protobuf objects.

Naming

Metrics will be named in a hierarchical format:

...

The PushTelemtryRequest.ContentType must then be set to application/x-protobuf+zstd;type=oltp07otlp07 (or any of the other Kafka-supported compression codecs: gzip, snappy, lz4, zstd).

Tests indicate that a compression ratio up to 10x is possible for the standard metrics.

Decompression of the payloads would be handled by the broker prior to forwarding the data to the plugins.

Client behaviour

A client that supports this metric interface and identifies a supportive broker (through detecting at least PushTelemetryRequestV0 in the ApiVersionResponse) will start off (after a short delay) by sending an empty PushTelemetryRequest, with the ClientInstanceId field set to Null, to one randomly selected connected broker to gather its client instance id, the desired push interval, subscribed push metrics, and what metrics formats the broker supports, this is effectively a metrics handshake that is performed each time a client selects a new broker to send metrics to.

...

The metric types in the following tables correspond to the OpenTelemetry v1 metrics protobuf message types. A short summary:

  • IntSum, DoubleSumSum  - Monotonic total count meter (Counter). Suitable for total number of X counters, e.g., total number of bytes sent.
  • IntGauge, DoubleGaugeGauge  - Non-monotonic current value meter (UpDownCounter). Suitable for current value of Y, e.g., current queue count.
  • IntHistogram, DoubleHistogramHistogram  - Value distribution meter (ValueRecorder). Suitable for latency values, etc.

...

Metric name

Type

Labels

Description

client.connection.creations

IntSumSum

FIXME: with broker_id label?

Total number of broker connections made.

client.connection.count

IntGaugeGauge


Current number of broker connections.

client.connection.errors

IntSumSum

reason

Total number of broker connection failures. Label ‘reason’ indicates the reason: 

disconnect - remote peer closed the connection.

auth - authentication failure.

TLS - TLS failure.

timeout - client request timeout.

close - client closed the connection.

client.request.rtt

IntGaugeGauge

broker_id

Average request latency / round-trip-time to broker and back

client.request.queue.latency

IntGaugeGauge

broker_id

Average request queue latency waiting for request to be sent to broker.

client.request.queue.count

IntGaugeGauge

broker_id

Number of requests in queue waiting to be sent to broker.

client.request.success

IntSumSum

broker_id

Number of successful requests to broker, that is where a response is received without no request-level error (but there may be per-sub-resource errors, e.g., errors for certain partitions within an OffsetCommitResponse).

client.request.errors

IntSumSum

broker_id

reason

Number of failed requests.

Label ‘reason’ indicates the reason:

timeout - client timed out the request,

disconnect - broker connection was closed before response could be received,

error - request-level protocol error.

client.io.wait.time

IntGaugeGauge


Amount of time waiting for socket I/O. FIXME: histogram? Avg? Total?

Should this be for POLLOUT only?

...

Metric name

Type

Labels

Description

client.producer.record.bytes

IntGaugeGauge


Total number of record memory currently in use by producer. This includes the record fields (key, value, etc) as well as any implementation specific overhead (objects, etc).

client.producer.record.count

IntGaugeGauge


Total number of records currently handled by producer.

client.producer.queue.max.bytes

IntGaugeGauge


Total amount of queue/buffer memory allowed on the producer queue(s).

client.producer.queue.bytes

IntGaugeGauge


Current amount of memory used in producer queues.

client.producer.queue.max.messages

IntGaugeGauge


Maximum amount of messages allowed on the producer queue(s).

client.producer.queue.messages

IntGaugeGauge


Current number of messages on the producer queue(s).

...

Metric name

Type

Labels

Description

client.consumer.poll.interval

DoubleGauge Gauge FIXME


The interval at which the application calls poll(), in seconds.

client.consumer.poll.last

DoubleGaugeGauge


The number of seconds since the last poll() invocation.

client.consumer.poll.latency

DoubleGaugeGauge


The time it takes poll() to return a new message to the application

client.consumer.commit.count

IntSumSum


Number of commit requests sent.

client.consumer.group.assignment.partition.count

IntGaugeGauge


Number of currently assigned partitions to this consumer by the group leader.

client.consumer.assignment.partition.count

IntGaugeGauge


Number of currently assigned partitions to this consumer, either through the group protocol or through assign().

client.consumer.group.rebalance.count

IntSumSum


Number of group rebalances.

client.consumer.record.queue.count

IntGaugeGauge


Number of records in consumer pre-fetch queue.

client.consumer.record.queue.bytes

IntGaugeGauge


Amount of record memory in consumer pre-fetch queue. This may also include per-record overhead.

client.consumer.record.application.count

IntSumSum


Number of records consumed by application.

client.consumer.record.application.bytes

IntSumSum


Memory of records consumed by application.

client.consumer.fetch.latency

IntGaugeGauge


FetchRequest latency.

client.consumer.fetch.count

IntCountCount


Total number of FetchRequests sent.

client.consumer.fetch.failures

IntCountCount


Total number of FetchRequest failures.

...

Metric name

Type

Labels

Description

client.producer.partition.queue.bytes

IntGaugeGauge

topic

partition

acks=all|none|leader

Number of bytes queued on partition queue.

client.producer.partition.queue.count

IntGaugeGauge

topic

partition

acks=all|none|leader

Number of records queued on partition queue.

client.producer.partition.latency

DoubleGaugeGauge

topic

partition

acks=all|none|leader

Total produce record latency, from application calling send()/produce() to ack received from broker.

client.producer.partition.queue.latency

DoubleGaugeGauge

topic

partition

acks=all|none|leader

Time between send()/produce() and record being sent to broker.

client.producer.partition.record.retries

IntSumSum

topic

partition

acks=all|none|leader

Number of ProduceRequest retries.

client.producer.partition.record.failures

IntSumSum

topic

partition

acks=all|none|leader

reason

Number of records that permanently failed delivery. Reason is a short string representation of the reason, which is typically the name of a Kafka protocol error code, e.g., “RequestTimedOut”.

client.producer.partition.record.success

IntSumSum

topic

partition

acks=all|none|leader

Number of records that have been successfully produced.

...

Metric name

Type

Labels

Description

client.process.memory.bytes

IntGaugeGauge


Current process/runtime memory usage (RSS, not virtual).

client.process.cpu.user.time

DoubleSumSum


User CPU time used (seconds).

client.process.cpu.system.time

DoubleSumSum


System CPU time used (seconds).

client.process.pid

IntGaugeGauge


The process id. Can be used, in conjunction with client.host.name, to map multiple client instances to the same process.

Only emitted if private metrics are enabled.

...

The Kafka protocol has flexible fields that would allow us to construct the metrics serialization with pure Kafka protocol fields, but this would duplicate the efforts of the OpenTelemetry specification where Kafka would have to maintain its own specification. On the broker side the native Kafka protocol metrics would need to be converted to OpenTelemetry (or other) format anyway.

The OpenTelemtry OpenTelemetry serialization format is all Protobufs and should thus be generally available across all client implementation languages.

...