...
- New PushTelemetryRequest protocol request type.
- Updated metrics receiver interface in the broker to accustom for the new semantics and format (TBD).
- A new CLIENT_METRICS ConfigEntry resource type for configuring metrics subscriptions.
- kafka-client-metrics.sh CLI script to configure and list client metrics subscriptions.
...
Code Block |
---|
PushTelemetryRequest { ClientInstanceId string // client.id + “-” + UUID4 unique for this client instance. // Must be set to Null on the first request, and to the // returned ClientInstanceId from the first response // for all subsequent requests to any broker. Terminating bool // Client is terminating. ContentType string // “application/x-protobuf;type=oltp07”otlp+metrics07” or “application/x-protobuf+zstd;oltp07”type=otlp+metrics07” Metrics binary // Format specified by ContentType, possibly compressed. } PushTelemetryResponse { ThrottleTime int32 // Standard and metric-specific throttling ErrorCode int16 // Error code ClientInstanceId string // Will be set to a generated unique id if the // request ClientInstanceId was Null, else // this field will be set to Null. AcceptedContentTypes Array[string] // Accepted metric formats. // Only returned if Request.ContentType is null // or empty, or ErrorCode is set to // UnsupportedFormat(?). // Also includes the supported compression types. NextPushMs int32 // Delay until next PushTelemetry RequestedMetrics Array[string] // Requested Metrics prefix string match. // Empty array: No metrics subscribed. // Array[0] empty string: All metrics subscribed. // Array[..]: prefix string match } |
...
The ContentType in the client's PushTelemetryRequest must include a type parameter describing the data format, in this case oltp07 otlp+metrics07 for OpenTelemetry protocol v0.7. This is needed since OLTP OTLP is not yet at v1 and may thus have backwards-incompatible changes.
Broker metrics receiver
...
may thus have backwards-incompatible changes.
Broker metrics receiver
The broker will expose a plugin interface for client telemetry to be forwarded to or collected by external systems. In particular, since we already collect data in OpenTelemetry format, on goal is to make it possible for a plugin to forward metrics directly to an OpenTelemetry collector.
The existing MetricsReporter plugin interface was built for plugins to pull metrics from the broker, but does not lend itself to the model where the broker pushes metric samples to plugins.
In order to minimize the complexity of additional configuration mechanisms, we are proposing to reuse the existing metrics reporter configuration mechanisms, but allow plugins to implement a new interface (trait) to indicate if they support receiving client telemetry.
This also allows a plugin to reuse the existing labels passed through the MetricsContext (e.g. broker, cluster id, and configured labels) and add them to the OpenTelemetry resource labels as needed.
Code Block |
---|
/**
* A {@link MetricsReporter} may implement this interface to indicate support for collecting client telemetry on the server side
*/
@InterfaceStability.Evolving
public interface ClientTelemetry {
/**
* Called by the broker to create a ClientTelemetryReceiver instance.
* This instance may be cached by the broker.
*
* This method will always be called after the initial call to
* {@link MetricsReporter#contextChange(MetricsContext)}
* on the MetricsReporter implementing this interface.
*
* @return
*/
ClientTelemetryReceiver clientReceiver();
}
@InterfaceStability.Evolving
public interface ClientTelemetryReceiver {
/**
* Called by the broker when a client telemetry reports telemetry. The associated request context can be used
* by the plugin to retrieve additional client information such as client ids or endpoints.
*
* This method may be called from the request handling thread, and as such should avoid blocking
*
* @param context the client request context for the corresponding PushTelemetryRequest api call
* @param payload the encoded telemetry payload as sent by the client
*/
void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload);
/**
* Returns the list of accepted content types for client metric payloads supported by this receiver
* This method may be used by the broker to inform connecting clients of supported payload formats
*
* <p>e.g.
* <code>application/x-protobuf;type=otlp+metrics07</code>,
* </p>
*
* @return collection of supported mime types
*/
Collection<String> acceptedContentTypes();
}
@InterfaceStability.Evolving
public interface ClientTelemetryPayload {
String clientInstanceId();
String contentType();
ByteBuffer data();
} |
Client API
Retrieve broker-generated client instance id:
...
- PushTelemetryRequest|Response - protocol request used by the client to send metrics to any broker it is connected to. The response is used to control push frequency, what metrics the client should push, etc.
- Standardised metrics - a set of standardised metrics that all supporting clients should provide.
- AdminAPI config - the AdminAPI configuration interface with a new CLIENT_METRICS resource type is used to manage metrics subscriptions.
- Client metrics plugin / extending the MetrocsReporter MetricsReporter interface - a broker plugin interface that performs something meaningful with the metrics. This plugin will typically forward the metrics to a time series database. It is recommended that any broker-side processing is kept to a minimum for scalability and performance reasons.
...
The OpenTelemetry specification will be used as the metrics serialization format as well as to define how metrics are constructed when it comes to naming, metric type, etc.
See the OpenTelemetry metrics specification for more information.
Encoding
While the protocol-level request will allow for other metrics formats, Kafka will only ship with OpenTelemetry and it is required for a client that claims support of this KIP to use OpenTelemetry by default as well.
See the OpenTelemetry metrics specification for more information.
Metric payloads are encoded as OpenTelemetry ExportMetricsServiceRequest protobuf objects.
Naming
Metrics will be named in a hierarchical format:
...
The PushTelemtryRequest.ContentType must then be set to application/x-protobuf+zstd;type=oltp07otlp07 (or any of the other Kafka-supported compression codecs: gzip, snappy, lz4, zstd).
Tests indicate that a compression ratio up to 10x is possible for the standard metrics.
Decompression of the payloads would be handled by the broker prior to forwarding the data to the plugins.
Client behaviour
A client that supports this metric interface and identifies a supportive broker (through detecting at least PushTelemetryRequestV0 in the ApiVersionResponse) will start off (after a short delay) by sending an empty PushTelemetryRequest, with the ClientInstanceId field set to Null, to one randomly selected connected broker to gather its client instance id, the desired push interval, subscribed push metrics, and what metrics formats the broker supports, this is effectively a metrics handshake that is performed each time a client selects a new broker to send metrics to.
...
The metric types in the following tables correspond to the OpenTelemetry v1 metrics protobuf message types. A short summary:
- IntSum, DoubleSumSum - Monotonic total count meter (Counter). Suitable for total number of X counters, e.g., total number of bytes sent.
- IntGauge, DoubleGaugeGauge - Non-monotonic current value meter (UpDownCounter). Suitable for current value of Y, e.g., current queue count.
- IntHistogram, DoubleHistogramHistogram - Value distribution meter (ValueRecorder). Suitable for latency values, etc.
...
Metric name | Type | Labels | Description |
client.connection.creations | IntSumSum | FIXME: with broker_id label? | Total number of broker connections made. |
client.connection.count | IntGaugeGauge | Current number of broker connections. | |
client.connection.errors | IntSumSum | reason | Total number of broker connection failures. Label ‘reason’ indicates the reason: disconnect - remote peer closed the connection. auth - authentication failure. TLS - TLS failure. timeout - client request timeout. close - client closed the connection. |
client.request.rtt | IntGaugeGauge | broker_id | Average request latency / round-trip-time to broker and back |
client.request.queue.latency | IntGaugeGauge | broker_id | Average request queue latency waiting for request to be sent to broker. |
client.request.queue.count | IntGaugeGauge | broker_id | Number of requests in queue waiting to be sent to broker. |
client.request.success | IntSumSum | broker_id | Number of successful requests to broker, that is where a response is received without no request-level error (but there may be per-sub-resource errors, e.g., errors for certain partitions within an OffsetCommitResponse). |
client.request.errors | IntSumSum | broker_id reason | Number of failed requests. Label ‘reason’ indicates the reason: timeout - client timed out the request, disconnect - broker connection was closed before response could be received, error - request-level protocol error. |
client.io.wait.time | IntGaugeGauge | Amount of time waiting for socket I/O. FIXME: histogram? Avg? Total? Should this be for POLLOUT only? |
...
Metric name | Type | Labels | Description |
client.producer.record.bytes | IntGaugeGauge | Total number of record memory currently in use by producer. This includes the record fields (key, value, etc) as well as any implementation specific overhead (objects, etc). | |
client.producer.record.count | IntGaugeGauge | Total number of records currently handled by producer. | |
client.producer.queue.max.bytes | IntGaugeGauge | Total amount of queue/buffer memory allowed on the producer queue(s). | |
client.producer.queue.bytes | IntGaugeGauge | Current amount of memory used in producer queues. | |
client.producer.queue.max.messages | IntGaugeGauge | Maximum amount of messages allowed on the producer queue(s). | |
client.producer.queue.messages | IntGaugeGauge | Current number of messages on the producer queue(s). |
...
Metric name | Type | Labels | Description |
client.consumer.poll.interval | DoubleGauge Gauge FIXME | The interval at which the application calls poll(), in seconds. | |
client.consumer.poll.last | DoubleGaugeGauge | The number of seconds since the last poll() invocation. | |
client.consumer.poll.latency | DoubleGaugeGauge | The time it takes poll() to return a new message to the application | |
client.consumer.commit.count | IntSumSum | Number of commit requests sent. | |
client.consumer.group.assignment.partition.count | IntGaugeGauge | Number of currently assigned partitions to this consumer by the group leader. | |
client.consumer.assignment.partition.count | IntGaugeGauge | Number of currently assigned partitions to this consumer, either through the group protocol or through assign(). | |
client.consumer.group.rebalance.count | IntSumSum | Number of group rebalances. | |
client.consumer.record.queue.count | IntGaugeGauge | Number of records in consumer pre-fetch queue. | |
client.consumer.record.queue.bytes | IntGaugeGauge | Amount of record memory in consumer pre-fetch queue. This may also include per-record overhead. | |
client.consumer.record.application.count | IntSumSum | Number of records consumed by application. | |
client.consumer.record.application.bytes | IntSumSum | Memory of records consumed by application. | |
client.consumer.fetch.latency | IntGaugeGauge | FetchRequest latency. | |
client.consumer.fetch.count | IntCountCount | Total number of FetchRequests sent. | |
client.consumer.fetch.failures | IntCountCount | Total number of FetchRequest failures. |
...
Metric name | Type | Labels | Description |
client.producer.partition.queue.bytes | IntGaugeGauge | topic partition acks=all|none|leader | Number of bytes queued on partition queue. |
client.producer.partition.queue.count | IntGaugeGauge | topic partition acks=all|none|leader | Number of records queued on partition queue. |
client.producer.partition.latency | DoubleGaugeGauge | topic partition acks=all|none|leader | Total produce record latency, from application calling send()/produce() to ack received from broker. |
client.producer.partition.queue.latency | DoubleGaugeGauge | topic partition acks=all|none|leader | Time between send()/produce() and record being sent to broker. |
client.producer.partition.record.retries | IntSumSum | topic partition acks=all|none|leader | Number of ProduceRequest retries. |
client.producer.partition.record.failures | IntSumSum | topic partition acks=all|none|leader reason | Number of records that permanently failed delivery. Reason is a short string representation of the reason, which is typically the name of a Kafka protocol error code, e.g., “RequestTimedOut”. |
client.producer.partition.record.success | IntSumSum | topic partition acks=all|none|leader | Number of records that have been successfully produced. |
...
Metric name | Type | Labels | Description |
client.process.memory.bytes | IntGaugeGauge | Current process/runtime memory usage (RSS, not virtual). | |
client.process.cpu.user.time | DoubleSumSum | User CPU time used (seconds). | |
client.process.cpu.system.time | DoubleSumSum | System CPU time used (seconds). | |
client.process.pid | IntGaugeGauge | The process id. Can be used, in conjunction with client.host.name, to map multiple client instances to the same process. Only emitted if private metrics are enabled. |
...
The Kafka protocol has flexible fields that would allow us to construct the metrics serialization with pure Kafka protocol fields, but this would duplicate the efforts of the OpenTelemetry specification where Kafka would have to maintain its own specification. On the broker side the native Kafka protocol metrics would need to be converted to OpenTelemetry (or other) format anyway.
The OpenTelemtry OpenTelemetry serialization format is all Protobufs and should thus be generally available across all client implementation languages.
...