Status
Current state: Under discussion
Discussion thread: here
JIRA: here TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Being able to centrally monitor and troubleshoot problems with Kafka clients is becoming increasingly important as the use of Kafka is expanding within organizations as well as for hosted Kafka services. The typical Kafka client user is now an application owner with little experience in operating Kafka clients, while the cluster operator has profound Kafka knowledge but little insight in the client application.
While the broker already tracks request-level metrics for connected clients, there is a gap in the end-to-end monitoring when it comes to visibility of client internals, be it queue sizes, internal latencies, error counts, application behaviour (such as message processing rate), etc. These are Kafka client metrics, and not application metrics.
This proposal aims to provide a generic and standardised interface through which a cluster operator or an external system may request and a client can push client-side metrics to the broker, a plugin interface for how those metrics are handled by the broker, and a minimum set of standard pre-defined metrics that a Kafka client should expose through this interface. A supporting client should have metrics enabled by default and not require any extra configuration, and will provide at least the standardised metrics as outlined later in this proposal.
One of the key goals of this KIP is to have the proposed metrics and telemetry interface generally available and enabled by default in all of the mainstream Kafka clients, allowing troubleshooting and monitoring as needed without interaction from cluster end-users.
User privacy is an important concern and extra care is taken in this proposal to not expose any information that may compromise the privacy of the client user.
Public Interfaces
This KIP introduces the following new public interfaces:
- New PushTelemetryRequest protocol request type.
- Updated metrics receiver interface in the broker to accustom for the new semantics and format.
- A new CLIENT_METRICS ConfigEntry resource type for configuring metrics subscriptions.
- kafka-client-metrics.sh CLI script to configure and list client metrics subscriptions.
Protocol
PushTelemetryRequest { ClientInstanceId string // client.id + “-” + UUID4 unique for this client instance. // Must be set to Null on the first request, and to the // returned ClientInstanceId from the first response // for all subsequent requests to any broker. Terminating bool // Client is terminating. ContentType string // “application/x-protobuf;type=otlp+metrics09” or “application/x-protobuf+zstd;type=otlp+metrics09” Metrics binary // Format specified by ContentType, possibly compressed. } PushTelemetryResponse { ThrottleTime int32 // Standard and metric-specific throttling ErrorCode int16 // Error code ClientInstanceId string // Will be set to a generated unique id if the // request ClientInstanceId was Null, else // this field will be set to Null. AcceptedContentTypes Array[string] // Accepted metric formats. // Only returned if Request.ContentType is null // or empty, or ErrorCode is set to // UnsupportedFormat(?). // Also includes the supported compression types. // Ordered by preference (most suitable type first). NextPushMs int32 // Delay until next PushTelemetry RequestedMetrics Array[string] // Requested Metrics prefix string match. // Empty array: No metrics subscribed. // Array[0] empty string: All metrics subscribed. // Array[..]: prefix string match }
The ContentType in the client's PushTelemetryRequest must include a type parameter describing the data format, in this case otlp+metrics09 for OpenTelemetry Protobuf version 0.9. This is needed since OTLP is not yet at v1 and may thus have backwards-incompatible changes.
Broker metrics receiver
The broker will expose a plugin interface for client telemetry to be forwarded to or collected by external systems. In particular, since we already collect data in OpenTelemetry format, one goal is to make it possible for a plugin to forward metrics directly to an OpenTelemetry collector.
The existing MetricsReporter plugin interface was built for plugins to pull metrics from the broker, but does not lend itself to the model where the broker pushes metric samples to plugins.
In order to minimize the complexity of additional configuration mechanisms, we are proposing to reuse the existing metrics reporter configuration mechanisms, but allow plugins to implement a new interface (trait) to indicate if they support receiving client telemetry.
This also allows a plugin to reuse the existing labels passed through the MetricsContext (e.g. broker, cluster id, and configured labels) and add them to the OpenTelemetry resource labels as needed.
/** * A {@link MetricsReporter} may implement this interface to indicate support for collecting client telemetry on the server side */ @InterfaceStability.Evolving public interface ClientTelemetry { /** * Called by the broker to create a ClientTelemetryReceiver instance. * This instance may be cached by the broker. * * This method will always be called after the initial call to * {@link MetricsReporter#contextChange(MetricsContext)} * on the MetricsReporter implementing this interface. * * @return */ ClientTelemetryReceiver clientReceiver(); } @InterfaceStability.Evolving public interface ClientTelemetryReceiver { /** * Called by the broker when a client reports telemetry. The associated request context can be used * by the plugin to retrieve additional client information such as client ids or endpoints. * * This method may be called from the request handling thread, and as such should avoid blocking. * * @param context the client request context for the corresponding PushTelemetryRequest api call * @param payload the encoded telemetry payload as sent by the client */ void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload); /** * Returns the list of accepted content types for client metric payloads supported by this receiver * This method may be used by the broker to inform connecting clients of supported payload formats. * * <p>e.g. * <code>application/x-protobuf+zstd;type=otlp+metrics09</code>, * </p> * * @return collection of supported mime types */ Collection<String> acceptedContentTypes(); } @InterfaceStability.Evolving public interface ClientTelemetryPayload { String clientInstanceId(); String contentType(); ByteBuffer data(); }
Client API
Retrieve broker-generated client instance id, may be used by application to assist in mapping the client instance id to the application instance through log messages or other means.
StringOrError ClientInstance::ClientInstanceId(int timeout_ms)
If the client has not yet requested a client instance id from the broker this call will block up to the timeout, and trigger an immediate request which is otherwise delayed at client startup (see Client behaviour chapter). If no client instance id can be retrieved within the timeout an error is returned/raised (timeout, feature not supported by broker, auth failure, etc).
Client configuration
enable.metrics.push=true|false (default: true) -Allows disabling the functionality outlined in this proposal which should be enabled by default. (Or enable.telemetry?)
New error codes
TBD
Proposed Changes
Overview
This feature is made up of the following components:
- PushTelemetryRequest|Response - protocol request used by the client to send metrics to any broker it is connected to. The response is used to control push frequency, what metrics the client should push, etc.
- Standardised metrics - a set of standardised metrics that all supporting clients should provide.
- AdminAPI config - the AdminAPI configuration interface with a new CLIENT_METRICS resource type is used to manage metrics subscriptions.
- Client metrics plugin / extending the MetricsReporter interface - a broker plugin interface that performs something meaningful with the metrics. This plugin will typically forward the metrics to a time series database. It is recommended that any broker-side processing is kept to a minimum for scalability and performance reasons.
Client identification (CLIENT_INSTANCE_ID)
The added value of metrics collection is limited if the metrics can’t be tied to an entity, in this case a client instance. While aggregate metrics do provide value in trends and superficial monitoring, for metrics to be truly useful there needs to be a way to identify a particular client instance, where it runs, by who, etc.
While Kafka has a per-request client.id (which most Kafka clients allow to be optionally configured), it is in no way unique and can’t be used, despite its name, to identify a single client instance.
Other seemingly unique identifiers such as authentication principals, client IP source address and port, etc, are either not guaranteed to be unique (there may not be authentication involved or not specific to a single client instance), or not a singleton (a client will have multiple connections to the cluster each connection with its own unique address+port combination).
Since this KIP intends to have metrics enabled by default in supporting clients we can’t rely on the client.id being properly set by the application owner, so we have to resort to some other mechanism for a client instance to uniquely identify itself. There are ideally two entities that we would like to identify in metrics:
- Client instance - a single instance of a producer, consumer, Admin client, etc.
- Application process instance - a single application runtime, which may contain multiple client instances. E.g. a Kafka Streams application with a mix of Admin, producer and consumer instances.
This proposal suggests that as part of the initial metrics handshake the broker generates a unique client instance id based on the client.id and a UUID4 hex string and returns it to the client. The client must use the returned client instance id for the remaining lifetime of the client instance.
Prefixing the client instance id with the client.id allows the metrics configuration interface to seamlessly make matching on just the client.id (possibly matching multiple clients) as well as the full client instance id (matching just one single client instance).
The client should provide an API for the application to read the generated client instance id as to assist in mapping and identification of the client based on the collected metrics. As an alternative or complement the client may log its client instance id upon receiving its first non-empty PushTelemetryRequest. This allows live and post-mortem correlation between collected metrics and a client instance.
As it is not feasible for a Kafka client instance to automatically generate or acquire a unique identity for the application process it runs in, and as we can’t rely on the user to configure one, we treat the application instance id as an optional future nice-to-have that may be included as a metrics label if it has been set by the user. This allows a more direct mapping of client instance to application instance and vice versa. However, due to these constraints, adding an application instance id is outside the scope of this proposal.
Kafka Streams applications have an application.id
configured and this identity should be included as the application_id
metrics label.
Mapping
Mapping the client instance id to an actual application instance running on a (virtual) machine can be done by inspecting the metrics resource labels, such as the client source address and source port, or security principal, all of which are added by the receiving broker. This will allow the operator together with the user to identify the actual application instance.
Metrics naming and format
The OpenTelemetry specification will be used as the metrics serialization format as well as to define how metrics are constructed when it comes to naming, metric type, etc.
See the OpenTelemetry metrics specification for more information.
Encoding
While the protocol-level request will allow for other metrics formats, Kafka will only ship with OpenTelemetry and it is required for a client that claims support of this KIP to use OpenTelemetry by default as well.
Metric payloads are encoded as OpenTelemetry ExportMetricsServiceRequest protobuf objects.
Naming
Metrics will be named in a hierarchical format:
<namespace>.<metric.name>
E.g.:
org.apache.kafka.client.producer.partition.queue.bytes
The metric namespace is org.apache.kafka for the standard metrics, vendor/implementation specific metrics may be added in separate namespaces, e.g:
librdkafka.client.producer.xmitq.latency
io.confluent.client.python.object.count
The Apache Kafka Java client would provide implementation specific metrics in:
org.apache.kafka.client.java.producer.socket.buffer.full.count
Metrics may hold any number of key=value labels which provide the multi-dimensionality of metrics, e.g., the per-partition queue bytes metric described above would have (at least) the following labels:
topic=mytopic
partition=3
acks=all
A client instance is considered a metric resource and the resource-level (thus client instance level) labels could include:
client_software_name=confluent-kafka-python
client_software_version=v2.1.3
client_instance_id=someClientId-B64CD139-3975-440A-91D4
idempotence=true
transactional_id=someTxnApp
Sparse metrics
To keep metrics volume down it is recommended that a client only sends metrics with a recorded value.
Metrics are to be sent as CUMULATIVE values, rather than DELTA, which allows for missed/dropped transmits without data loss.
Compression
As metric names and labels are highly repeated it is recommended that the serialized metrics are compressed prior to sending them to the broker if the serialized uncompressed size exceeds 1000 bytes.
The PushTelemtryRequest.ContentType must then be set to application/x-protobuf+zstd;type=otlp+metrics09 (or any of the other Kafka-supported compression codecs: gzip, snappy, lz4, zstd).
Tests indicate that a compression ratio up to 10x is possible for the standard metrics.
Decompression of the payloads will be handled by the broker prior to forwarding the data to the plugins.
Client behaviour
A client that supports this metric interface and identifies a supportive broker (through detecting at least PushTelemetryRequestV0 in the ApiVersionResponse) will start off (after a short delay) by sending an empty PushTelemetryRequest, with the ClientInstanceId field set to Null, to one randomly selected connected broker to gather its client instance id, the desired push interval, subscribed push metrics, and what metrics formats the broker supports, this is effectively a metrics handshake that is performed each time a client selects a new broker to send metrics to.
This initial handshake should be delayed by the client by 30 seconds after the client instance has been created to avoid short-lived one-off clients (such as Admin API tools) to negotiate and send metrics and trigger state creation for the client on the receiving broker.
Upon receiving the PushTelemetryResponse the client must wait until the NextPushMs has expired before sending the next PushTelemetryRequest including the requested metrics. The NextPushMs may change in each PushTelemetryResponse and the client should react accordingly.
If PushTelemetryResponse.RequestedMetrics indicates that no metrics are desired the client must still send an empty PushTelemetryRequest by the next NextPushMs interval to get an updated set of desired or immediate metrics as well as a new NextPushMs time.
Client termination
When a client is being shut down it should send its final metrics regardless of the NextPushMs time, and only if the last PushTelemetryResponse.RequestedMetrics array length was greater than zero.
To avoid the receiving broker’s metrics rate-limiter to discard this out-of-profile push the PushTelemetryRequest.Terminating field must be set to true. A broker must only allow one such consecutive digression and otherwise throttle the client as if this field was not set.
The metrics should contain the reason for the client termination by including the client.terminating metric with the label “reason” set to a human readable explanation why the client is being shut down, such as “Fatal error: Static consumer fenced by newer instance”, or “Consumer closed by application”.
Error handling
Actions to be taken by the client if the PushTelemetryResponse.ErrorCode is set to a non-zero value.
Error code | Reason | Action |
..AuthorizationFailed | Client is not permitted to send metrics. | Raise a non-fatal error to the application and schedule the next PushTelemetryRequest to 30 minutes. |
UnsupportedEncoding | Broker does not support the client’s metric encoding type. | Raise a non-fatal error to the application and schedule the next PushTelemetry to 30 minutes. |
InvalidRecord | Broker failed to decode or validate the client’s encoded metrics. | Raise a non-fatal error to the application and schedule the next PushTelemetry to 5 minutes. |
FunctionalityNotEnabled | Client sent a PushTelemetryRequest to a broker which does not have a client metrics receiver plugin configured. | Raise a non-fatal error to the application and schedule the next PushTelemetry to 30 minutes on another broker. |
RateLimited | Client sent a non-Terminate push request before the NextPushMs time had expired. | The response will include only the NextPushMs which the client will use to adjust its next push time. |
UnsupportedCompressionType | Client’s compression type is not supported by the broker. | The client should switch to another compression type. When all compression types are exhausted the application should fall back to uncompressed format. |
The 5 and 30 minute retries are to eventually trigger a retry and avoid having to restart clients if the cluster metrics configuration is disabled temporarily, e.g., by operator error.
How non-fatal errors are propagated to the application is client and language specific, simply logging the error is sufficient.
Opting out
Client metrics should be enabled by default in the client but a user may disable the metrics functionality by setting the enable.metrics.push property to false.
Although metrics are enabled in a client, the client will only push metrics that have been configured/subscribed in the cluster.
Also see the Privacy chapter below.
Receiving broker behaviour
We allow the client to send PushTelemetryRequest to any connected broker that reports PushTelemetryRequests in its ApiVersionResponse.
If PushTelemetryRequest.ClientInstanceId is Null the broker will generate a unique id for the client and return it in the response. Sub-sequent PushTelemetryRequests from the client to any broker must have the ClientInstanceId set to this returned value.
The broker should add additional metrics labels to help identify the client instance before forwarding the metrics to an external system. These are labels such as the namespace, cluster id, client principal, client source address and port, etc. As the metrics plugin may need to add additional metrics on top of this the generic metrics receiver in the broker will not add these labels but rely on the plugins to do so, this avoids deserializing and serializing the received metrics multiple times in the broker.
See Broker added labels below for the list of labels that should be added by the plugin.
If there is no client metrics receiver plugin configured on the broker it will respond to PushTelemetryRequests with PushTelemetryResponse.ErrorCode set to FunctionalityNotEnabled. The client should not attempt a PushTelemetryRequest until at least 30 minutes has passed, which allows the metrics receiver to be enabled or disabled without having to restart the broker or reset the client connection.
PushTelemetryRequest handling
Validation
Validation of the encoded metrics is the task of the ClientMetricsReceiver, if the encoding type is unsupported the response will be returned with ErrorCode set to UnsupportedEncoding. Should decoding or validation of the binary metrics blob fail the ErrorCode will be set to InvalidRecord.
Rate-limiting and NextPushMs
The receiving broker’s standard quota-based throttling should operate as usual for PushTelemetryRequest, but in addition to that the PushTelemetryRequest is also subject to rate-limiting based on the calculated next desired NextPushMs interval derived from the configured metrics subscriptions. Should the client send a push request prior to expiry of the previously calculated NextPushMs the broker will discard the metrics and return a PushTelemetryResponse with ErrorCode set to RateLimited and NextPushMs set to the remaining time, all other fields should be left blank.
The one exception to this rule is when the client sets the PushTelemetryRequest.Terminating field to true indicating that the client is terminating, in this case the metrics should be accepted by the broker, but a consecutive request must ignore the Terminating field and apply rate-limiting as if the field was not set. The Terminating flag may be reused upon the next expiry of NextPushMs.
If the receiving broker has no prior metrics state but at least one matching metrics subscription for a client instance id the first returned NextPushMs is a randomised value between 0 and the lowest metrics interval of the subscription. This attempts to distribute the client metrics pushes in case a large number of clients are started simultaneously.
Request and response size estimations / FIXME / WIP
A producer's typical static (e.g., standard collecting, non-troubleshooting) subscription set may look something like:
- org.apache.kafka/client.producer. // prefix match
- org.apache.kafka/client.connection.errors
- org.apache.kafka/client.request.errors
With a recommended collection interval of 60 seconds.
This gives a PushTelemetryResponse size of roughly respHdr(8) + respFields(20) + clientId(~60) + subscriptionStrings(36 + 43 + 41) = 208 bytes, every 60 seconds, per connected producer client per cluster.
Depending on the amount of metrics the producer implementation provides for the given prefixes, the PushTelemetryRequest with the above metrics would yield FIXME(count it), compressed down to FIXME, every 60 seconds, per connected producer client per cluster. FIXME: For now, 1000 bytes. Compression ratio 8x.
10000 producers with these metrics subscribed, in a 12 broker cluster yields the following numbers:
- 14 PushTelemetryRequests/second per broker
- 15 kb/s requests / broker
- 3 kb/s responses / broker
- 112 kb/s uncompressed OTLP metrics provided to metrics plugin per broker. 1.3MB/s per cluster.
Note: A future optimization could be to cache subscription sets and identify each set by a unique id (e.g., the checksum of the set) generated by the broker and passed down to the client in the PushTelemetryResponse. The client could then quickly see if the subscription needed to be changed or not.
Metrics subscription
Metrics subscriptions are configured through the standard Kafka Admin API configuration interface with the new resource-type CLIENT_METRICS, the resource-name is a prefix-match of the client instance id (which as stated previously is a concatenation of the client.id and a unique uuid string). The configuration entry name is a prefix-matched metrics name (e.g., “client.producer.partition.”) while the configuration value is the desired collection/push interval in milliseconds.
Example:
$ kafka-configs.sh --bootstrap-server $BROKERS \ --entity-type client_metrics \ --entity-name ‘rdkafka’ \ --alter \ --add-config "org.apache.kafka/client.producer.partition.queue.=15000"
Our using the dedicated tool:
$ kafka-client-metrics.sh --bootstrap-server $BROKERS \ --add \ --id rdkafka \ # prefix match --metric org.apache.kafka/client.producer.partition.queue. \ # prefix-match --interval-ms 15000
As the assumption is that the number of CLIENT_METRICS configuration entries will be relatively small (<100), all brokers with a configured metrics plugin will monitor and cache the configuration entries for the CLIENT_METRICS resource type.
As a PushTelemetryRequest is received for a previously unknown client instance id the config cache is queried for any config that prefix-matches the request’s client instance id and the resulting matching configuration entries are compiled into a list of subscribed metrics which is returned in PushTelemetryResponse.DesiredMetrics along with the minimum configured collection interval (this can be improved in future versions by making a scheduler in the broker that interleaves requests/responses per metrics configuration entry, so that each subscribed metric is collected with its configured interval, but in its current form longer-interval metrics are included “for free” if there are shorter-interval metrics in the subscription set).
This client instance specific state is maintained in memory up to MAX(60*1000, NextPushMs * 3) milliseconds and is used to enforce the push interval rate-limiting. There is no persistence of client instance metrics state across broker restarts or between brokers.
Client metrics and metric labels
Clients that support this KIP must provide the following metrics, as long as they’re relevant for the client implementation. These metrics do not need to be comparable between client implementations, e.g., io.wait.time may have somewhat different semantics between Java client and librdkafka.
We try to avoid metrics that are available or derivable from existing broker metrics, such as request rates, etc.
All standard metrics are prefixed with “org.apache.kafka.”, this prefix is omitted from the following tables for brevity.
A client implementation must not add additional metrics or labels under the “org.apache.kafka.” prefix without a corresponding accepted KIP.
Private metrics
The default metrics collection on the client must take extra care not to expose any information about the application and system it runs on as they may identify internal projects, infrastructure, etc, that the user may not want to expose to the Kafka infrastructure owner. This includes information such as hostname, operating system, credentials, runtime environment, etc. None of the metrics defined in this proposal expose any such information.
OpenTelemetry specifies a range of relevant metrics:
Private or host metrics are outside the scope of this specification.
Metric types
The metric types in the following tables correspond to the OpenTelemetry v1 metrics protobuf message types. A short summary:
- Sum - Monotonic total count meter (Counter). Suitable for total number of X counters, e.g., total number of bytes sent.
- Gauge - Non-monotonic current value meter (UpDownCounter). Suitable for current value of Y, e.g., current queue count.
- Histogram - Value distribution meter (ValueRecorder). Suitable for latency values, etc.
For simplicy a client implementation may choose to provide an average value as Gauge instead of a Histogram. These averages should be using the original Histogram metric name + ".avg60s" (or whatever the averaging period is), e.g., "client.request.rtt.avg10s".
Client instance-level metrics
Metric name | Type | Labels | Description |
client.connection.creations | Sum | FIXME: with broker_id label? | Total number of broker connections made. |
client.connection.count | Gauge | Current number of broker connections. | |
client.connection.errors | Sum | reason | Total number of broker connection failures. Label ‘reason’ indicates the reason: disconnect - remote peer closed the connection. auth - authentication failure. TLS - TLS failure. timeout - client request timeout. close - client closed the connection. |
client.request.rtt | Histogram | broker_id | Average request latency / round-trip-time to broker and back |
client.request.queue.latency | Histogram | broker_id | Average request queue latency waiting for request to be sent to broker. |
client.request.queue.count | Gauge | broker_id | Number of requests in queue waiting to be sent to broker. |
client.request.success | Sum | broker_id | Number of successful requests to broker, that is where a response is received without no request-level error (but there may be per-sub-resource errors, e.g., errors for certain partitions within an OffsetCommitResponse). |
client.request.errors | Sum | broker_id reason | Number of failed requests. Label ‘reason’ indicates the reason: timeout - client timed out the request, disconnect - broker connection was closed before response could be received, error - request-level protocol error. |
client.io.wait.time | Histogram | Amount of time waiting for socket I/O writability (POLLOUT). A high number indicates socket send buffer congestion. |
As the client will not know the broker id of its bootstrap servers the broker_id label should be set to to the negative index of the broker in the client's bootstrap.servers list, starting at -1.
Client instance-level Producer metrics
Metric name | Type | Labels | Description |
client.producer.record.bytes | Gauge | Total number of record memory currently in use by producer. This includes the record fields (key, value, etc) as well as any implementation specific overhead (objects, etc). | |
client.producer.record.count | Gauge | Total number of records currently handled by producer. | |
client.producer.queue.max.bytes | Gauge | Total amount of queue/buffer memory allowed on the producer queue(s). | |
client.producer.queue.bytes | Gauge | Current amount of memory used in producer queues. | |
client.producer.queue.max.messages | Gauge | Maximum amount of messages allowed on the producer queue(s). | |
client.producer.queue.messages | Gauge | Current number of messages on the producer queue(s). |
Client instance-level Consumer metrics
Metric name | Type | Labels | Description |
client.consumer.poll.interval | Histogram | The interval at which the application calls poll(), in seconds. | |
client.consumer.poll.last | Gauge | The number of seconds since the last poll() invocation. | |
client.consumer.poll.latency | Histogram | The time it takes poll() to return a new message to the application | |
client.consumer.commit.count | Sum | Number of commit requests sent. | |
client.consumer.group.assignment.strategy | String | Current group assignment strategy in use. | |
client.consumer.group.assignment.partition.count | Gauge | Number of currently assigned partitions to this consumer by the group leader. | |
client.consumer.assignment.partition.count | Gauge | Number of currently assigned partitions to this consumer, either through the group protocol or through assign(). | |
client.consumer.group.rebalance.count | Sum | Number of group rebalances. | |
client.consumer.group.error.count | Sum | error | Consumer group error counts. The error label depicts the actual error, e.g., "MaxPollExceeded", "HeartbeatTimeout", etc. |
client.consumer.record.queue.count | Gauge | Number of records in consumer pre-fetch queue. | |
client.consumer.record.queue.bytes | Gauge | Amount of record memory in consumer pre-fetch queue. This may also include per-record overhead. | |
client.consumer.record.application.count | Sum | Number of records consumed by application. | |
client.consumer.record.application.bytes | Sum | Memory of records consumed by application. | |
client.consumer.fetch.latency | Histogram | FetchRequest latency. | |
client.consumer.fetch.count | Count | Total number of FetchRequests sent. | |
client.consumer.fetch.failures | Count | Total number of FetchRequest failures. |
Client topic-level Producer metrics
Metric name | Type | Labels | Description |
client.producer.partition.queue.bytes | Gauge | topic partition acks=all|none|leader | Number of bytes queued on partition queue. |
client.producer.partition.queue.count | Gauge | topic partition acks=all|none|leader | Number of records queued on partition queue. |
client.producer.partition.latency | Histogram | topic partition acks=all|none|leader | Total produce record latency, from application calling send()/produce() to ack received from broker. |
client.producer.partition.queue.latency | Histogram | topic partition acks=all|none|leader | Time between send()/produce() and record being sent to broker. |
client.producer.partition.record.retries | Sum | topic partition acks=all|none|leader | Number of ProduceRequest retries. |
client.producer.partition.record.failures | Sum | topic partition acks=all|none|leader reason | Number of records that permanently failed delivery. Reason is a short string representation of the reason, which is typically the name of a Kafka protocol error code, e.g., “RequestTimedOut”. |
client.producer.partition.record.success | Sum | topic partition acks=all|none|leader | Number of records that have been successfully produced. |
The “partition” label should be “unassigned” for not yet partitioned messages, as they are not yet assigned to a partition queue.
Host process metrics (optional)
These metrics provide runtime information about the operating system process the client runs in.
Metric name | Type | Labels | Description |
client.process.memory.bytes | Gauge | Current process/runtime memory usage (RSS, not virtual). | |
client.process.cpu.user.time | Sum | User CPU time used (seconds). | |
client.process.cpu.system.time | Sum | System CPU time used (seconds). | |
client.process.io.wait.time | Sum | IO wait time (seconds). | |
client.process.pid | Gauge | The process id. Can be used, in conjunction with the client host name to map multiple client instances to the same process. |
Standard client resource labels
Label name | Description |
client_software_name | The client’s implementation name. |
client_software_version | The client’s version |
client_instance_id | The generated CLIENT_INSTANCE_ID. |
client_id | |
application_id | application.id (Kafka Streams only) |
client_rack | client.rack (if configured) |
group_id | group.id (consumer) |
group_instance_id | group.instance.id (consumer) |
group_member_id | Group member id (if any, consumer) |
transactional_id | transactional.id (producer) |
Broker-added labels
The following labels are added by the broker as metrics are received
Label name | Description |
client_source_address | The client connection’s source address. |
client_source_port | The client connection’s source port. |
principal | Client’s security principal. Content depends on authentication method. |
broker_id | Receiving broker’s node-id. |
Security
Since client metric subscriptions are primarily aimed at the infrastructure operator that is managing the Kafka cluster it should be sufficient to limit the config control operations to the CLUSTER resource.
There will be no permission checks on the PushTelemetryRequest itself.
API Request | Resource | ACL |
DescribeConfigs | CLUSTER | DESCRIBE,READ |
AlterConfigs | CLUSTER | WRITE |
PushTelemetryRequest | N/A | N/A |
See the Private metrics chapter above for privacy/integrity.
Tools
A wrapper tool around kafka-configs.sh that makes it easier to set up metric subscriptions.
bin/kafka-client-metrics.sh [arguments] --bootstrap-server <brokers> List configured metrics: --list [--id <client-instance-id> | --id-prefix <prefix-match>] Add metrics: --add --id <client-instance-id> | --id-prefix <prefix-match> --metric <metric-prefix>.. --interval-ms <interval> Delete metrics: --delete --id <client-instance-id> | --id-prefix <prefix-match> [--metric <metric-prefix>].. Example: # Subscribe to producer partition queue and memory usage # metrics every 60s from all librdkafka clients. $ kafka-client-metrics.sh --bootstrap-server localhost:9092 \ --add \ --id-prefix rdkafka \ --metric org.apache.kafka/client.producer.partition. \ --metric librdkafka.client.memory. \ --interval 60000 # For a specific client instance: $ kafka-client-metrics.sh --bootstrap-server localhost:9092 \ --delete \ --id rdkafka-33E53F93-C6D0-4136-A4A7-6308050AFFF3 \ --metric org.apache.kafka/client.request.errors # Subscribe to all metrics by providing an empty metric. $ kafka-client-metrics.sh --bootstrap-server localhost:9092 \ --add \ --id rdkafka-33E53F93-C6D0-4136-A4A7-6308050AFFF3 \ --metric '' \ --interval 120000 # The metrics themselves are not viewable with this CLI tool # since the storage of metrics is plugin-dependent.
Example workflows
Example workflows showcasing how this feature may be used for proactive and reactive purposes.
Proactive monitoring
The Kafka cluster is configured to collect all standard metrics pushed by the client at an interval of 60 seconds, the metrics plugin forwards the collected metrics to an imaginary system that monitors a set of well known metrics and triggers alarms when trends go out of profile.
The monitoring system detects an anomaly for CLIENT_INSTANCE_ID=java-producer-1234’s metric org.apache.kafka.client.producer.partition.queue.latency which for more than 180 seconds has exceeded the threshold of 5000 milliseconds.
Before sending the alert to the incident management system the monitoring system collects a set of labels that are associated with this CLIENT_INSTANCE_ID, such as:
- client.id
- source_address and source_port on broker id X (1 or more such mappings based on how many connections the client has used to push metrics).
- principal
- tenant
- client_software_name and client_software_version
- In case of consumer: group_id, group_instance_id (if configured) and the latest known group_member_id.
- In case of transactional producer: transactional_id
This collection of information, along with the triggered metric, is sent to the incident management system for further investigation or aggregation, and provides enough information to identify who and where the client is run. Further action might be to contact the organization or team that matches the principal, transactional.id, source address, etc, for further investigation.
Reactive monitoring
The Kafka cluster configuration for metrics collection is irrelevant to this use-case, given that the proper metrics plugin is enabled. The metrics plugin is configured to write metrics to a topic. A support system with an interactive interface is reading from this metrics topic, and has an Admin client to configure the cluster with desired metrics subscriptions.
The application owner reports a lagging consumer that is not able to keep up with the incoming message rate and asks for the Kafka operator to help troubleshoot. The application owner, who unfortunately does not know the client instance id of the consumer, provides the client.id, userid and source address.
The Kafka operator adds a metrics subscription for metrics matching prefix “org.apache.kafka.client.consumer.” and with the corresponding client.id as resource-name prefix. Since this is a live troubleshooting case the metrics push interval is set to a low 10 seconds.
The metrics subscription propagates through configuration change notifications to all brokers which update their local metrics subscription config cache.
Upon the next intervalled PushTelemetryRequest from the consumer the receiving broker checks its metrics subscription cache and finds that the consumer’s client instance id prefix-matches that of a configuration entry and responds with a PushTelemetryResponse including the subscribed metrics and a random NextPushMs interval in the range of 0 and the configured interval (10 seconds).
Upon the next PushTelemetryRequest, which now includes metrics for the subscribed metrics, the metrics are written to the output topic and the NextPushMs is adjusted to the configured interval of 10 seconds. This repeats until the metrics subscription configuration is changed.
As the consumer metrics are now being written to the metrics topic the support system reads the metrics, sees that there is an active viewer for the given client.id, and displays the metrics to the operator.
Because the client.id may not be unique there might be metrics from multiple consumers, so the operator adds a local display filter on the source_address and the principal’s user_id labels, which are matched to the labels in the metrics data, and the view is updated to only show the single matching consumer.
The operator identifies an increasing trend in client.consumer.processing.time which indicates slow per-message processing in the application and reports this back to the application owner, ruling out the client and Kafka cluster from the problem space.
The operator removes the metrics subscription for the client.id, which causes the next PushTelemetryResponse to return an empty subscription causing the client to stop pushing the previously subscribed metrics.
Future work
Tracing and logging are outside the scope of this proposal but the use of OpenTelemetry allows for those additional two methods within the same semantics and nomenclature outlined by this proposal.
Compatibility, Deprecation, and Migration Plan
What impact (if any) will there be on existing users?
Clients
Short of the configuration property to disable telemetry (enable.telemetry=false - enabled by default) there are no user-facing changes to client users.
Depending on metrics subscription intervals there might be increased CPU and network load, but for modestly set subscription intervals this should be negligible.
Broker
The Kafka admin needs to explicitly configure a Metrics reporter plugin that supports the new push-based telemtry interface. If no such plugin is configured the telemetry collection is disabled and there will be no user-facing changes.
Depending on the metrics subscription patterns and intervals there is likely to be increased CPU and network load as brokers will receive, decompress and forward pushed client telemetry to the metrics plugin.
If we are changing behavior how will we phase out the older behavior?
No behavioural changes.
If we need special migration tools, describe them here.
None.
When will we remove the existing behavior?
Not relevant.
Rejected Alternatives
Send metrics out-of-band directly to collector
There are plenty of existing solutions that allows a client implementation to send metrics directly to a collector, but it falls short to meet the enabled-by-default requirements of this KIP:
- will require additional client-side configuration: endpoints, authentication, etc.
- may require additional network filtering and our routing configuration to allow the client host to reach the collector endpoints. By using the Kafka protocol we already have a usable connection.
- adds another network protocol the client needs to handle and its runtime dependencies (libraries..).
Dedicated Metrics coordinator based on client instance id
An earlier version of this proposal had the notion of metrics coordinator which was selected based on the client instance id. This metrics coordinator would be responsible for rate-limiting pushed metrics per client instance. To avoid each client to have a connection to its metric coordinator the proposal suggested using the impersonation/request-forwarding functionality suggested in a contemporary design by Jason G.
Since this made the broker-side implementation more complex and did not provide fool-proof control of misbehaving clients (a rogue client could simply generate new instance ids on each request) it was decided that the value of this functionality was not in par with the implementation cost and thus removed from the proposal.
JMX
Scales poorly, Java-specific, commonly mistaken for a trick bike.
Combine with Dynamic client configuration
The Push interface is well suited for propagating client configuration to clients, but to keep scope down we leave that out of this proposal. It should be straightforward to extend and repurpose the interface described in this proposal to also support dynamic client configuration, log/event retrieval, etc, at a later time.
Use native Kafka protocol framing for metrics
The Kafka protocol has flexible fields that would allow us to construct the metrics serialization with pure Kafka protocol fields, but this would duplicate the efforts of the OpenTelemetry specification where Kafka would have to maintain its own specification. On the broker side the native Kafka protocol metrics would need to be converted to OpenTelemetry (or other) format anyway.
The OpenTelemetry serialization format is all Protobufs and should thus be generally available across all client implementation languages.