Table of Contents |
---|
Status
Current state: Under discussionApproved
Discussion thread: here and now here
JIRA: KAFKA-15601here TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
For metrics to be collected, a MetricsPlugin
which implements the ClientTelemetry
interface must be configured on the brokers, and at least one metrics subscription must be configured through the Admin API. Only then will metrics subscriptions be propagated to clients, and only then will clients push metrics to the broker. It is thus up to the cluster operator to explicitly enable client metrics collection.
Because this KIP includes a new kind of ConfigResource, this KIP is only supported for KRaft clusters. This avoids doing the work to store these new resources in ZooKeeper, given that ZooKeeper support is already deprecated.
Client metrics subscription
...
As it is not feasible for a Kafka client instance to automatically generate or acquire a unique identity for the application process it runs in, and as we can’t rely on the user to configure one, we treat the application instance id as an optional future nice-to-have that may be included as a metrics label if it has been set by the user. This allows a more direct mapping of client instance to application instance and vice versa. However, due to these constraints and the need for zero-configuration on the client, adding an application instance id configuration property is outside the scope of this proposal.Kafka Streams applications have an application.id
configured and this identifier should be included as the application_id
metrics label.
Mapping
Mapping the client instance id to an actual application instance running on a (virtual) machine can be done by inspecting the metrics resource labels, such as the client source address and source port, or security principal, all of which are added by the receiving broker. This will allow the operator together with the user to identify the actual application instance.
...
Metric payloads are encoded as OpenTelemetry MetricsData v1 protobuf objects.
Naming
...
Metrics are to be sent as either DELTA or CUMULATIVE values, depending on the value of DeltaTemporality in the GetTelemetrySubscriptionsResponse. Clients must support both temporalities.
CUMULATIVE metrics allow for missed/dropped transmits without loss of precision at the cost of increased processing and complexity required in upstream systems.
While clients must support both temporalities, the broker will initially only send GetTelemetrySubscriptionsResponse.DeltaTemporality=True. Configuration properties or extensions to the Metrics plugin interface on the broker to change the temporality is outside the scope of this KIP and may be addressed at a later time as the need arises.
See OTLP specification for more information on temporality.
Serialized size
See OTLP specification for more information on temporality.
Serialized size
As an example, the serialized size prior to compression of As an example, the serialized size prior to compression of all producer and standard metrics defined in this KIP for a producer producing to 50 partitions gives approximately 100 kB.
...
Defining standard and required metrics makes the monitoring and troubleshooting of clients from various client types easier because the operator can combine the same metric from all client types. For example, request latencies for all client types are reported in a consistent way. Care has been taken to make these standard metrics as generic as possible, and they should fit most Kafka client implementations.
...
Telemetry metric name | OTLP metric data point type | Labels | Description | Existing Kafka metric name |
---|---|---|---|---|
| Gauge | The rate of connections established per second. | “connection-creation-rate”, group=”producer-metrics” | |
| Sum | The total number of connections established. | “connection-creation-total”, group=”producer-metrics” | |
| Gauge | node_id | The average request latency in ms for a node. | “request-latency-avg”, group=”producer-node-metrics” |
| Gauge | node_id | The maximum request latency in ms for a node. | “request-latency-max”, group=”producer-node-metrics” |
| Gauge | The average time in ms a request was throttled by the broker. | “produce-throttle-time-avg”, group=“producer-metrics” | |
| Gauge | The maximum time in ms a request was throttled by the broker. | “produce-throttle-time-max”, group=“producer-metrics” | |
| Gauge | The average time in ms record batches spent in the send buffer. | “record-queue-time-avg”, group=“producer-metrics” | |
| Gauge | The maximum time in ms record batches spent in the send buffer. | “record-queue-time-max”, group=“producer-metrics” |
...
Telemetry metric name | OTLP metric data point type | Labels | Description | Existing metric name |
---|---|---|---|---|
| Gauge | The rate of connections established per second. | “connection-creation-rate”, group= “consumer-metrics” | |
| Sum | The total number of connections established. | “connection-creation-total”, group=”consumer-metrics” | |
| Gauge | node_id | The average request latency in ms for a node. | “request-latency-avg”, group= “consumer-node-metrics” |
| Gauge | node_id | The maximum request latency in ms for a node. | “request-latency-max”, group=“consumer-node-metrics” |
| Gauge | The average fraction of time the consumer’s poll() is idle as opposed to waiting for the user code to process records. | “poll-idle-ratio-avg”, group=“consumer-metrics” | |
| Gauge | The average time taken for a commit request. | “commit-latency-avg”, group=“consumer-coordinator-metrics” | |
| Gauge | The maximum time taken for a commit request. | “commit-latency-max”, group=“consumer-coordinator-metrics” | |
| Gauge | The number of partitions currently assigned to this consumer. | “assigned-partitions”, group=“consumer-coordinator-metrics” | |
| Gauge | The average time taken for group rebalance. | “rebalance-latency-avg”, group=“consumer-coordinator-metrics” | |
| Gauge | The maximum time taken for a group rebalance. | “rebalance-latency-max”, group=“consumer-coordinator-metrics” | |
| Sum | The total time taken for group rebalances. | “rebalance-latency-total”, group=“consumer-coordinator-metrics” | |
| Gauge | The average time taken for a fetch request. | “fetch-latency-avg”, group=“consumer-fetch-manager-metrics” | |
| Gauge | The maximum time taken for a fetch request. | “fetch-latency-max”, group=“consumer-fetch-manager-metrics” |
...
The following labels should be added by the client as appropriate before metrics are pushed.
Label name | Description | application_id | |
client_rack | client.rack (if configured) | ||
group_id | group.id (consumer) | ||
group_instance_id | group.instance.id (consumer) | ||
group_member_id | Group group member id (if any, consumer) | ||
transactional_id | transactional.id (producer) |
Broker-added resource labels
The following labels should be added by the broker plugin as metrics are received.
Label name | Description |
client_instance_id | The generated CLIENT_INSTANCE_ID. |
client_id | client.id as reported in the Kafka protocol header. |
client_software_name | The client’s implementation name as reported in ApiVersionRequest. |
client_software_version | The client’s version as reported in ApiVersionRequest. |
client_source_address | The client connection’s source address. |
client_source_port | The client connection’s source port. |
principal | Client’s security principal. Content depends on authentication method. |
brokernode_id | Receiving broker’s node-id. |
If the received metrics have resource labels which clash with those added by the broker, the broker's overwrite the received values.
Client behavior
A client that supports this metric interface and identifies a supporting broker (through detecting at least GetTelemetrySubscriptionsRequestV0 in the ApiVersionResponse) will start off by sending a GetTelemetrySubscriptionsRequest with the ClientInstanceId field set to Null to one randomly selected connected broker to gather its client instance id, the subscribed metrics, the push interval, accepted compression types, etcand so on. This handshake with a Null ClientInstanceId is only performed once for a client instance's lifetime. Subsequent GetTelemetrySubscriptionsRequests must include the ClientInstanceId returned in the first response, regardless of broker.
If a client attempts a subsequent handshake with a Null ClientInstanceId, the receiving broker may not already know the client's existing ClientInstanceId. If the receiving broker knows the existing ClientInstanceId, it simply responds the existing value back to the client. If it does not know the existing ClientInstanceId, it will create a new client instance ID and respond with that.
Upon receiving the GetTelemetrySubscriptionsResponse, the client Upon receiving the GetTelemetrySubscriptionsResponse, the client shall update its internal metrics collection to match the received subscription (absolute update) and update its push interval timer according to the received PushIntervalMs. The first metrics push should be randomized between 0.5 * PushIntervalMs and 1.5 * PushIntervalMs. This is to ensure that not all clients start pushing metrics at the same time after a cluster comes back up after some downtime.
If GetTelemetrySubscriptionsResponse.RequestedMetrics indicates that no metrics are desired (RequestedMetrics is Null), the client should send a new GetTelemetrySubscriptionsRequest after the PushIntervalMs has expired. This is to avoid having to restart clients if the cluster metrics configuration is disabled temporarily by operator error or maintenance such as rolling upgrades. The default PushIntervalMs is 300000 ms (5 minutes).
If GetTelemetrySubscriptionsResponse.RequestedMetrics is non-empty but does not match any metrics the client provides, then the client should send a PushTelemetryRequest at the indicated PushIntervalMs interval with an empty metrics blob. This is needed so that a broker metrics plugin can differentiate between non-responsive or buggy clients and clients that don't have metrics matching the subscription set.
...
KafkaAdminClient
: the "admin client runnable" threadKafkaAdminClient
creates a dedicated thread to execute theAdminClientRunnable
inner class- The
AdminClientRunnable
'sprocessRequests
method loops, polling for network requests viaNetworkClient
'spoll
method
- The
KafkaConsumer
: both the "heart beat" and application threadsKafkaConsumer
creates aConsumerNetworkClient
(which wraps aNetworkClient
) for network communicationKafkaConsumer
also creates aConsumerCoordinator
to manage consumer group members- The
ConsumerCoordinator
is provided with a reference to theConsumerNetworkClient
instance from theKafkaConsumer
ConsumerCoordinator
(via itsAbstractCoordinator
superclass) maintains an inner thread class namedHeartbeatThread
- The
HeartbeatThread
'srun
method loops and invokes theConsumerNetworkClient
'spoll
method
- The
- The
- The
KafkaConsumer
'spoll
method, invoked by the caller on an application thread, also invokes theConsumerNetworkClient
'spoll
method - Thus when either the heartbeat thread runs or the application thread polls for new records, the internal
NetworkClient
'spoll
method is invoked- Synchronization is performed by the
ConsumerNetworkClient
to make sure two threads don't access the innerNetworkClient
concurrently
- Synchronization is performed by the
KafkaProducer
: the "sender" thread- The
KafkaProducer
creates aSender
to run in a dedicated thread to manage produce requests- The
Sender
is provided with a reference to theNetworkClient
instance from theKafkaProducer
- The
Sender
'srun
method loops, calling a method namedrunOnce
in each pass- The
runOnce
method polls for network requests viaNetworkClient
'spoll
method
- The
- The
- The
Connection selection
The client may send telemetry requests to any broker, but shall prefer using an already available connection rather than creating a new connection to keep the number of cluster connections down.
It should also keep using the same broker connection for telemetry requests until the connection goes down, at which time it may choose to reconnect and continue using the same broker, or switch over to another broker connection. Using a persistent connection for PushTelemetryRequests is important so that metrics throttling can be properly performed by the receiving broker, and also avoids maintaining metrics state for the client instance id on multiple brokers.
Client termination
When a client with an active metrics subscription is being shut down, it should send its final metrics without waiting for the PushIntervalMs time.
To avoid the receiving broker’s metrics rate-limiter discarding this out-of-profile push, the PushTelemetryRequest.Terminating
field must be set to true. A broker must only allow one such unthrottled metrics push for each combination of client ID and SubscriptionId.
In the event that the client's metric subscription has changed and the final metrics push fails with error code UNKNOWN_SUBSCRIPTION_ID
, the terminating client can choose to obtain a new subscription ID by sending a GetTelemetrySubscriptionsRequest and then immediately sending a PushTelemetryRequest with the Terminating flag set to true, or it can choose to abandon sending a final metrics push.
The metrics should contain the reason for the client termination by including the client.terminating metric with the label “reason” set to a human readable explanation why the client is being shut down, such as “Fatal error: Static consumer fenced by newer instance”, or “Consumer closed by application”.
Error handling
Actions to be taken by the client if the GetTelemetrySubscriptionsResponse.ErrorCode or PushTelemetryResponse.ErrorCode is set to a non-zero value.
...
Error code
...
Reason
...
Client action
...
INVALID_RECORD
(87)
...
Broker failed to decode or validate the client’s encoded metrics.
...
Log a warning to the application and schedule the next GetTelemetrySubscriptionsRequest to 5 minutes.
...
UNSUPPORTED_COMPRESSION_TYPE
(76)
...
Client’s compression type is not supported by the broker.
...
Send a GetTelemetrySubscriptionRequest to get an up-to-date list of the broker's supported compression types (and any subscription changes).
KafkaConsumer
: (existing code): both the "heartbeat" and the application threadsKafkaConsumer
: (consumer threading refactor code as a part of KIP-848 effort): the "background" threadsKafkaProducer
: the "sender" thread
Connection selection
The client may send telemetry requests to any broker, but shall prefer using an already available connection rather than creating a new connection to keep the number of cluster connections down.
It should also keep using the same broker connection for telemetry requests until the connection goes down, at which time it may choose to reconnect and continue using the same broker, or switch over to another broker connection. Using a persistent connection for PushTelemetryRequests is important so that metrics throttling can be properly performed by the receiving broker, and also avoids maintaining metrics state for the client instance id on multiple brokers.
Client termination
When a client with an active metrics subscription is being shut down, it should send its final metrics without waiting for the PushIntervalMs time.
To avoid the receiving broker’s metrics rate-limiter discarding this out-of-profile push, the PushTelemetryRequest.Terminating
field must be set to true. A broker must only allow one such unthrottled metrics push for each combination of client instance ID and SubscriptionId.
In the event that the client's metric subscription has changed and the final metrics push fails with error code UNKNOWN_SUBSCRIPTION_ID
, the terminating client can choose to obtain a new subscription ID by sending a GetTelemetrySubscriptionsRequest and then immediately sending a PushTelemetryRequest with the Terminating flag set to true, or it can choose to abandon sending a final metrics push.
Error handling
The following error codes can be returned in PushTelemetryResponse
.
Error code | Reason | Client action |
| Client sent a PushTelemetryRequest when it has already sent a Terminating request. | Log an error and stop pushing metrics. This is viewed as a problem in the client implementation of metrics serialization that is not likely to be resolved by retrying. |
| Broker failed to decode or validate the client’s encoded metrics. | Log an error and stop pushing metrics. This is viewed as a problem in the client implementation of metrics serialization that is not likely to be resolved by retrying. |
| Client sent a PushTelemetryRequest larger than the maximum size the broker will accept. | Reduce the size of the metrics payload so its size does not exceed GetTelemetrySubscriptionsResponse.TelemetryMaxBytes. |
UNKNOWN_SUBSCRIPTION_ID (NEW) | Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId. The configured subscriptions have changed. | Immediately send a GetTelemetrySubscriptionsRequest to update the client's subscriptions and get a new SubscriptionId. |
| Client’s compression type is not supported by the broker. | Immediately send a GetTelemetrySubscriptionsRequest to get an up-to-date list of the broker's supported compression types (and any subscription changes). |
Retries should preferably be attempted on the same broker connection, in particular for UNKNOWN_SUBSCRIPTION_ID
, but another broker connection may be utilized at the discretion of the client.
How errors and warnings are propagated to the application is client- and language-specific. Simply logging the error is sufficient.
Java client dependencies
The OpenTelemetry metrics serialization is used as the payload of the telemetry metrics sent to the broker. This will require us to include the Java Bindings for the OpenTelemetry Protocol (OTLP) as a dependency for the Java client.
implementation("io.opentelemetry.proto:opentelemetry-proto:0.19.0-alpha")
The OTLP Java bindings library is a minimal dependency and itself requires only the Google Protobuf serialization library:
implementation("com.google.protobuf:protobuf-java:3.18.0")
This will allow us to build up Java objects in memory that reflect that specification and easily serialize them to the required transport wire format.
The Gradle build process will be updated to shadow the OTLP Java bindings library and its dependencies to avoid in-JVM versioning conflicts.
Receiving broker behavior
We allow clients to send the GetTelemetrySubscriptions and PushTelemetry requests to any connected broker that reports support for both these APIs in its ApiVersionResponse.
If GetTelemetrySubscriptionsRequest.ClientInstanceId
is Null the broker will generate a unique id for the client and return it in the response. Subsequent requests from the client to any broker must have the ClientInstanceId set to this returned value.
The broker should add additional metrics labels to help identify the client instance before forwarding the metrics to an external system. These are labels such as the namespace, cluster id, client principal, client source address and port. As the metrics plugin may need to add additional information on top of this, the generic metrics receiver in the broker will not add these labels but rely on the plugins to do so. This avoids deserializing and serializing the received metrics multiple times in the broker.
See Broker-added labels below for the list of labels that should be added by the plugin.
The broker only reports support for GetTelemetrySubscriptions and PushTelemetry requests in its ApiVersionResponse if it has a MetricsReporter
that implements the ClientTelemetry
interface. This means that clients will not attempt to push metrics to brokers that are not capable of receiving them. If there is no client metrics receiver plugin configured on the broker, the client will not send GetTelemetrySubscriptions or PushTelemetry RPCs, and existing behavior is preserved.
PushTelemetryRequest handling
Validation
Validation of the encoded metrics is the task of the ClientMetricsReceiver, if the compression type is unsupported the response will be returned with ErrorCode set to UnsupportedCompressionType. Should decoding or validation of the binary metrics blob fail the ErrorCode will be set to InvalidRecord.
Throttling and rate-limiting
There are two mechanisms at play to protect brokers from rogue or buggy clients that:
- Standard request throttling - will mute the client connection if user quotas (size and/or request rate) are exceeded.
- Metrics PushIntervalMs rate-limiting - ensures the client does not push telemetry more often than the configured PushIntervalMs (subscription interval). As this rate-limiting state is maintained by each broker the client is sending telemetry requests to it is possible for the client to send at most one accepted out-of-profile per connection before the rate-limiter kicks in. The metrics plugin itself may also put constraints on the maximum allowed metrics payload.
The receiving broker’s standard quota-based throttling should operate as usual for PushTelemetryRequest, but in addition to that the PushTelemetryRequest is also subject to rate-limiting based on the calculated next desired PushIntervalMs interval derived from the configured metrics subscriptions. Should the client send a push request prior to expiry of the previously calculated PushIntervalMs the broker will discard the metrics and return a PushTelemetryResponse with the ErrorCode set to THROTTLING_QUOTA_EXCEEDED.
The one exception to this rule is when the client sets the PushTelemetryRequest.Terminating field to true indicating that the client is terminating, in this case the metrics should be accepted by the broker, but a consecutive request must ignore the Terminating field and apply rate-limiting as if the field was not set. The Terminating flag may be reused upon the next expiry of PushIntervalMs.
In case the cluster load induced from metrics requests becomes unmanageable the remedy is to temporarily remove or limit configured metrics subscriptions.
Metrics subscription
Metrics subscriptions are configured through the standard Kafka Admin API configuration interface with the new config resource type CLIENT_METRICS
.
The ConfigResource has a name and is of type CLIENT_METRICS
. The resource name does not have significance to the metrics system other
The 5 and 30 minute retries are to eventually trigger a retry and avoid having to restart clients if the cluster metrics configuration is disabled temporarily, e.g., by operator error, rolling upgrades, etc.
Retries should preferably be attempted on the same broker connection, in particular for UNKNOWN_SUBSCRIPTION_ID
, but another broker connection may be utilized at the discretion of the client.
How error and warnings are propagated to the application is client and language specific. Simply logging the error is sufficient.
Java client dependencies
The OpenTelemetry metrics serialization is used as the payload of the telemetry metrics sent to the broker. This will require us to include the Java Bindings for the OpenTelemetry Protocol (OTLP) as a dependency for the Java client.
implementation("io.opentelemetry.proto:opentelemetry-proto:0.19.0-alpha")
The OTLP Java bindings library is a minimal dependency and itself requires only the Google Protobuf serialization library:
implementation("com.google.protobuf:protobuf-java:3.18.0")
This will allow us to build up Java objects in memory that reflect that specification and easily serialize them to the required transport wire format.
The Gradle build process will be updated to shadow the OTLP Java bindings library and its dependencies to avoid in-JVM versioning conflicts.
Receiving broker behavior
We allow clients to send the GetTelemetrySubscriptions and PushTelemetry requests to any connected broker that reports support for both these APIs in its ApiVersionResponse.
If GetTelemetrySubscriptionsRequest.ClientInstanceId
is Null the broker will generate a unique id for the client and return it in the response. Subsequent requests from the client to any broker must have the ClientInstanceId set to this returned value.
The broker should add additional metrics labels to help identify the client instance before forwarding the metrics to an external system. These are labels such as the namespace, cluster id, client principal, client source address and port, etc. As the metrics plugin may need to add additional metrics on top of this the generic metrics receiver in the broker will not add these labels but rely on the plugins to do so, this avoids deserializing and serializing the received metrics multiple times in the broker.
See Broker-added labels below for the list of labels that should be added by the plugin.
If there is no client metrics receiver plugin configured on the broker, it will respond to GetTelemetrySubscriptionsRequest with RequestedMetrics set to Null and a -1 SubscriptionId. The client should send a new GetTelemetrySubscriptionsRequest after the PushIntervalMs has expired. This allows the metrics receiver to be enabled or disabled without having to restart the broker or reset the client connection.
PushTelemetryRequest handling
Validation
Validation of the encoded metrics is the task of the ClientMetricsReceiver, if the compression type is unsupported the response will be returned with ErrorCode set to UnsupportedCompressionType. Should decoding or validation of the binary metrics blob fail the ErrorCode will be set to InvalidRecord.
Throttling and rate-limiting
There are two mechanisms at play to protect brokers from rogue or buggy clients that:
- Standard request throttling - will mute the client connection if user quotas (size and/or request rate) are exceeded.
- Metrics PushIntervalMs rate-limiting - ensures the client does not push telemetry more often than the configured PushIntervalMs (subscription interval). As this rate-limiting state is maintained by each broker the client is sending telemetry requests to it is possible for the client to send at most one accepted out-of-profile per connection before the rate-limiter kicks in. The metrics plugin itself may also put constraints on the maximum allowed metrics payload.
The receiving broker’s standard quota-based throttling should operate as usual for PushTelemetryRequest, but in addition to that the PushTelemetryRequest is also subject to rate-limiting based on the calculated next desired PushIntervalMs interval derived from the configured metrics subscriptions. Should the client send a push request prior to expiry of the previously calculated PushIntervalMs the broker will discard the metrics and return a PushTelemetryResponse with the ErrorCode set to THROTTLING_QUOTA_EXCEEDED.
The one exception to this rule is when the client sets the PushTelemetryRequest.Terminating field to true indicating that the client is terminating, in this case the metrics should be accepted by the broker, but a consecutive request must ignore the Terminating field and apply rate-limiting as if the field was not set. The Terminating flag may be reused upon the next expiry of PushIntervalMs.
In case the cluster load induced from metrics requests becomes unmanageable the remedy is to temporarily remove or limit configured metrics subscriptions.
Metrics subscription
Metrics subscriptions are configured through the standard Kafka Admin API configuration interface with the new config resource type CLIENT_METRICS
.
The ConfigResource has a name and is of type CLIENT_METRICS
. The resource name does not have significance to the metrics system other than to group metrics subscriptions in the configuration interface.
...
metrics
- a comma-separated list of telemetry metric name prefixes, e.g.,"org.apache.kafka.producer.node.request.latency., org.apache.kafka.consumer.coordinator.rebalance.latency.max"
. Whitespace is ignored.interval.ms
- metrics push interval in milliseconds. Defaults to 300000 ms (5 minutes if ) if not specified.match
- Client matching selector that is evaluated as a list of an anchored regular expressions (i.e., "something.*" is treated as "^something.*$"). Any client that matches all of the selectors will be eligible for this metrics subscription. The regular expressions are compiled and executed using Google RE2/J. Initially supported selectors are:client_instance_id
- CLIENT_INSTANCE_ID UUID string representation.client_id
- client's reported client.id in the GetTelemetrySubscriptionsRequest.client_software_name
- client software implementation name.client_software_version
- client software implementation version.client_source_address
- client connection's source address from the broker's point of view.client_source_port
- client connection's source port from the broker's point of view.
For example, using the standard kafka-configs.sh
tool for create a metrics subscription:
Code Block | ||
---|---|---|
| ||
$ kafka-configs.sh --bootstrap-server $BROKERS \
--entity-type client-metrics \
--entity-name "basic_producer_metrics" \
--alter \
--add-config "metrics=[org.apache.kafka.producer., org.apache.kafka.consumer.coordinator.rebalance.latency.max],interval.ms=15000,match=[client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538]"
|
There is also a new kafka-client-metrics.sh
tool which is described later that has easier syntax.
...
As a GetTelemetrySubscriptionsRequest is received for a previously unknown client instance id, the CLIENT_METRICS config cache is scanned for any configured metric subscriptions whose match selectors match that of the client. The resulting matching configuration entries are compiled into a list of subscribed metrics which is returned in GetTelemetrySubscriptionsResponse.RequestedMetrics along with the minimum configured collection interval (this can be improved in future versions by including a per-metric interval so that each subscribed metric is collected with its configured interval, but in its current form longer-interval metrics are included “for free” if there are shorter-interval metrics in the subscription set). The CRC32 CRC32C checksum is also calculated based on the compiled metrics and is returned as the SubscriptionId in the response, as well as stored in the per-client-instance cache on the broker to track configuration changes.
...
Public Interfaces
Kafka Protocol Changes
These new RPCs are only supported on KRaft clusters.
Code Block |
---|
GetTelemetrySubscriptionsRequestV0 { ClientInstanceId uuid // UUID4 unique for this client instance. // Must be set to Null on the first request, and to the returned ClientInstanceId // from the first response for all subsequent requests to any broker. } GetTelemetrySubscriptionsResponseV0 { ThrottleTimeThrottleTimeMs int32 // Standard throttling. ErrorCode int16 // ErrorThe code. duration in milliseconds for ClientInstanceIdwhich uuidthe request was throttled due to a quota violation, // Assigned client instance id if ClientInstanceId was Null in the request, else Null. SubscriptionId int32 // Uniqueor identifierzero forif the currentrequest subscriptiondid setnot forviolate thisany client instancequota. ErrorCode int16 // The AcceptedCompressionTypes Array[int8] // The compression types the broker accepts for PushTelemetryRequest.CompressionType error code, or 0 if there was no error. ClientInstanceId uuid // Assigned client instance id if ClientInstanceId was Null in the request, else Null. SubscriptionId int32 // as listed in MessageHeaderV2.Attributes.CompressionType. The array will be sorted in // Unique identifier for the current subscription set for this client instance. AcceptedCompressionTypes Array[int8] // The compression types the broker accepts for PushTelemetryRequest.CompressionType // preference order from higher to lower. The CompressionType of NONE will not be // as listed in MessageHeaderV2.Attributes.CompressionType. The array will be sorted in // present in the response from the broker, though the broker does support uncompressed // preference order from higher to lower. The CompressionType of NONE will not be // client telemetry if none of the accepted compression codecs are supported by the client. PushIntervalMs int32 // Configuredpresent pushin interval,the whichresponse isfrom the lowestbroker, configuredthough intervalthe inbroker thedoes current subscription set.support uncompressed DeltaTemporality bool // If True; monotonic/counter metrics are to be emitted as deltas to the previous sample. // client telemetry if none of the accepted compression codecs are supported by the client. PushIntervalMs int32 // If False; monotonic/counter metrics are to be emitted as cumulative absolute values. RequestedMetrics Array[string] // Requested telemetry metrics prefix string match. // Empty array: No metrics subscribed. // Array[0] empty string: All metrics subscribed. // Array[..]: prefix string match. } PushTelemetryRequestV0 { ClientInstanceId uuid Configured push interval, which is the lowest configured interval in the current subscription set. TelemetryMaxBytes int32 // The maximum bytes of binary data the broker accepts in PushTelemetryRequest. DeltaTemporality bool // UUID4 unique for this client instance, If True; monotonic/counter metrics are to be emitted as retrieveddeltas into the firstprevious GetTelemetrySubscriptionsRequestsample. SubscriptionId int32 // SubscriptionId from the GetTelemetrySubscriptionsResponse for the collected metrics. Terminating bool // If False; monotonic/counter metrics are to be emitted //as Clientcumulative isabsolute terminatingvalues. CompressionType int8 // Compression codec used for .Metrics (ZSTD, LZ4, Snappy, GZIP, None). RequestedMetrics Array[string] // Requested telemetry metrics prefix string match. // Empty array: No metrics subscribed. // Array[0] "*": All metrics subscribed. // Array[..]: prefix string match. } PushTelemetryRequestV0 { ClientInstanceId uuid // UUID4 unique for this client instance, as retrieved in the first GetTelemetrySubscriptionsRequest. SubscriptionId int32 // SameSubscriptionId valuesfrom asthe thatGetTelemetrySubscriptionsResponse offor the currentcollected MessageHeaderV2.Attributesmetrics. MetricsTerminating binary. bool // Metrics encoded in OpenTelemetry MetricsData v1 protobuf format. } PushTelemetryResponseV0 { ThrottleTime int32 Client is terminating. CompressionType int8 // Compression codec used for .Metrics (ZSTD, LZ4, // Standard throttling. ErrorCode int16Snappy, GZIP, None). // Same values Error code. } |
Metrics serialization format
The metrics format for PushTelemetryRequestV0 is OpenTelemetry Protobuf protocol definitions version 0.19.
Future versions of PushTelemetryRequest
and GetTelemetrySubscriptionsRequest
may include a content-type field to allow for updated OTLP format versions (or additional formats), but this field is currently not included since only one format is specified by this proposal.
SubscriptionId
as that of the current MessageHeaderV2.Attributes.
Metrics binary // Metrics encoded in OpenTelemetry MetricsData v1 protobuf format.
}
PushTelemetryResponseV0 {
ThrottleTimeMs int32 // The duration in milliseconds for which the request was throttled due to a quota violation,
// or zero if the request did not violate any quota.
ErrorCode int16 // The error code, or 0 if there was no error.
}
|
Metrics serialization format
The metrics format for PushTelemetryRequestV0 is OpenTelemetry Protobuf protocol definitions version 0.19.
Future versions of PushTelemetryRequest
and GetTelemetrySubscriptionsRequest
may include a content-type field to allow for updated OTLP format versions (or additional formats), but this field is currently not included since only one format is specified by this proposal.
SubscriptionId
The SubscriptionId is a unique identifier for a client instance's subscription set, the id is generated by calculating a CRC32C of the configured metrics subscriptions matching the given client including the PushIntervalMs, XORed with the ClientInstanceId. This SubscriptionId is returned to the client in the GetTelemetrySubscriptionsResponse and the client The SubscriptionId is a unique identifier for a client instance's subscription set, the id is generated by calculating a CRC32 of the configured metrics subscriptions matching the given client including the PushIntervalMs, XORed with the ClientInstanceId. This SubscriptionId is returned to the client in the GetTelemetrySubscriptionsResponse and the client passes that SubscriptionId in each subsequent PushTelemetryRequest for those received metrics. If the configured subscriptions are updated and resulting in a change for a client instance, the SubscriptionId is recalculated. Upon the next PushTelemetryRequest using the previous SubscriptionId, the broker will find that the received and expected SubscriptionIds differ and it will return UNKNOWN_SUBSCRIPTION_ID
back to the client. When a client receives this error code it will immediately send a GetTelemetrySubscriptionsRequest to retrieve the new subscription set along with a new SubscriptionId.
...
This also allows a plugin to reuse the existing labels passed through the MetricsContext (e.g. broker, cluster id, and configured labels) and add them to the OpenTelemetry resource labels as needed.
The broker only reports support for GetTelemetrySubscriptions and PushTelemetry requests in its ApiVersionResponse if it has a MetricsReporter
that implements the ClientTelemetry
interface. This means that clients will not attempt to push metrics to brokers that are not capable of receiving them.
Code Block |
---|
/**
* A {@link MetricsReporter} |
Code Block |
/** * A {@link MetricsReporter} may implement this interface to indicate support for collecting client telemetry on the server side */ @InterfaceStability.Evolving public interface ClientTelemetry { /** * Called by the broker to create a ClientTelemetryReceiver instance. * This instance may be cached by the broker. * * This method will always be called after the initial call to * {@link MetricsReporter#contextChange(MetricsContext)} * on the MetricsReporter implementing this interface. * * @return */ ClientTelemetryReceiver clientReceiver(); } @InterfaceStability.Evolving public interface ClientTelemetryReceiver { /** * Called by the broker when a client reports telemetry. The associated request context can be used * by the plugin to retrieve additional client information such as client ids or endpoints. * * This method may be called from the request handling thread, and as such should avoid blocking. * * @param context the client request context for the corresponding PushTelemetryRequest api call * @param payload the encoded telemetry payload as sent by the client */ void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload); } @InterfaceStability.Evolving public interface ClientTelemetryPayload { /** * Client's instance id. */ Uuid clientInstanceId(); /** * Indicates whether the client is terminating and sending its last metrics push. */ boolean isTerminating(); /** * Metrics data content-type / serialization format. * Currently "application/x-protobuf;type=otlp+metrics0.19" */ String contentType(); /** * Serialized uncompressed metrics data. */ ByteBuffer data(); } |
...
Retrieve broker-generated client instance id, may be used by application to assist in mapping the client instance id to the application instance through log messages or other means.
The client instance ids returned correspond to the client_instance_id labels added by the broker to the metrics pushed from the clients. This should be sufficient information to enable correlation between the metrics available in the client, and the metrics pushed to the broker.
The following method is added to the Producer, Consumer, and Admin client interfaces:
Code Block |
---|
/** * @returns@return the client's assigned instance id used for metrics collection. */ public String clientInstanceId(Duration timeout) |
If the client has not yet requested a client instance id from the broker, this call may block up to the duration of the timeout. If no client instance id can be retrieved within the timeout, an error is returned (such as timeout, feature not supported by broker, auth failure).
Client configuration
This applies to producers, consumers, admin client, and of course embedded uses of these clients in frameworks such as Kafka Connect.
...
true
(default) - The client will push metrics if there are any matching subscriptions.
false
- The client will not push metrics.
Client metrics configuration
These are the configurations for client metrics resources. A client metrics subscription is defined by the configurations for a resource of type CLIENT_METRICS
.
...
An empty list means no metrics subscribed.
A list containing just an empty string means all metrics subscribed.
Otherwise, the list entries are prefix-matched against the metric names.
...
A list of key-value pairs.
The valid keys are:
client_instance_id
- CLIENT_INSTANCE_ID UUID string representation.client_id
- client's reported client.id in the GetTelemetrySubscriptionsRequest.client_software_name
- client software implementation name.client_software_version
- client software implementation version.client_source_address
- client connection's source address from the broker's point of view.client_source_port
- client connection's source port from the broker's point of view.
The values are anchored regular expressions.
New error codes
UNKNOWN_SUBSCRIPTION_ID
- Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId. The configured subscriptions have changed.
Metrics
The following new broker metrics should be added:
...
Metric Name
...
Type
...
Tags
...
Description
...
ClientMetricsInstanceCount
...
Gauge
...
version: broker's software version
...
Current number of client metric instances being managed by the broker. E.g., the number of unique CLIENT_INSTANCE_IDs with an empty or non-empty subscription set.
...
ClientMetricsSubscriptionRequestCount
...
Meter
...
version: broker's software version
...
Total number of GetTelemetrySubscriptionsRequests received by this broker.
...
ClientMetricsUnknownSubscriptionRequestCount
...
Meter
...
client version: client's software version
...
Total number of metrics requests GetTelemetrySubscriptionsRequests with unknown CLIENT_INSTANCE_IDs.
...
ClientMetricsThrottleCount
...
Meter
...
client_instance_id
...
Total number of throttled PushTelemetryRequests due to a higher PushTelemetryRequest rate than the allowed PushIntervalMs.
...
ClientMetricsPluginExportCount
...
Meter
...
client_instance_id
...
The total number of metrics requests being pushed to metrics plugins, e.g., the number of exportMetrics() calls.
...
ClientMetricsPluginErrorCount
...
Meter
...
client_instance_id
reason (reason for the failure)
...
The total number of exceptions raised from plugin's exportMetrics().
...
@throws InterruptException If the thread is interrupted while blocked.
* @throws TimeoutException Indicates that a request timed out
* @throws KafkaException If an unexpected error occurs while trying to determine the client
* instance ID, though this error does not necessarily imply the
* consumer client is otherwise unusable.
* @throws IllegalArgumentException If the {@code timeout} is negative.
* @throws IllegalStateException If telemetry is not enabled ie, config `{@code enable.metrics.push}`
* is set to `{@code false}`.
*/
public Uuid clientInstanceId(Duration timeout); |
If the client has not yet requested a client instance id from the broker, this call may block up to the duration of the timeout. In the event that the client instance id cannot be obtained within the timeout, the method throws org.apache.kafka.common.errors.TimeoutException
.
In addition, the following method is added to the KafkaStreams interface to give access to the client instance ids of the producers, consumers and admin clients used by Kafka Streams:
Code Block |
---|
/**
* @return The internal clients' assigned instance ids used for metrics collection.
*
* @throws IllegalArgumentException If {@code timeout} is negative.
* @throws IllegalStateException If {@code KafkaStreams} is not running.
* @throws TimeoutException Indicates that a request timed out.
* @throws StreamsException For any other error that might occur.
*/
public ClientInstanceIds clientInstanceIds(Duration timeout); |
This method is only permitted when Kafka Streams is in state RUNNING or REBALANCING. In the event that Kafka Streams is not in state RUNNING or REBALANCING, the method throws an IllegalStateException.
In the event that any of the client instance ids cannot be obtained within the timeout, the method throws org.apache.kafka.common.errors.TimeoutException
.
The new interface org.apache.kafka.streams.ClientInstanceIds
is defined as follows:
Code Block |
---|
/**
* Encapsulates the client instance ids used for metrics collection by
* producers, consumers and admin clients used by Kafka Streams.
*/
public interface ClientInstanceIds {
/**
* Get the client instance id of the admin client
*
* @return the client instance id
*
* @throws IllegalStateException If telemetry is disabled on the admin client.
*/
Uuid adminInstanceId();
/**
* Get the client instance ids of the consumers
*
* @return a map from thread key to client instance id
*/
Map<String, Uuid> consumerInstanceIds();
/**
* Get the client instance ids of the producers
*
* @return a map from thread key to client instance id
*/
Map<String, Uuid> producerInstanceIds();
} |
Finally, a new method is added to org.apache.kafka.clients.CommonClientConfigs
to return the ClientTelemetryReporter
if configured. This mirrors the similar metricsReporters()
methods in that class.
Code Block | ||
---|---|---|
| ||
public static Optional<ClientTelemetryReporter> telemetryReporter(String clientId, AbstractConfig config); |
Broker configuration
Configuration | Description | Values |
---|---|---|
telemetry.max.bytes | The maximum size (after compression if compression is used) of telemetry pushed from a client to the broker. | int, default: 1048576, valid values: [1,...] |
Client configuration
This applies to producers, consumers, admin client, and of course embedded uses of these clients in frameworks such as Kafka Connect.
Configuration | Description | Values |
---|---|---|
enable.metrics.push | Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client. |
|
Following the usual convention, the string constant ENABLE_METRICS_PUSH_CONFIG ("enable.metrics.push")
will be added to CommonClientConfigs, ProducerConfig, ConsumerConfig, AdminClientConfig and StreamsConfig.
Client metrics configuration
These are the configurations for client metrics resources. A client metrics subscription is defined by the configurations for a resource of type CLIENT_METRICS
.
Configuration | Description | Values |
---|---|---|
metrics | A list of telemetry metric name prefixes which specify the metrics of interest. | An empty list means no metrics subscribed. A list containing just Otherwise, the list entries are prefix-matched against the metric names. |
interval.ms | The client metrics push interval in milliseconds. | Default: 300000 (5 minutes), minimum: 100, maximum: 3600000 (1 hour) |
match | The match criteria for selecting which clients the subscription matches. If a client matches all of these criteria, the client matches the subscription. | A list of key-value pairs. The valid keys are:
The values are anchored regular expressions. |
New error codes
TELEMETRY_TOO_LARGE
- Client sent a PushTelemetryRequest with a payload that was too large.
UNKNOWN_SUBSCRIPTION_ID
- Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId. The configured subscriptions have changed.
Metrics
The following new broker metrics should be added:
Metric Name | Type | Group | Tags | Description |
---|---|---|---|---|
ClientMetricsInstanceCount | Gauge | ClientMetrics | Current number of client metric instances being managed by the broker. E.g., the number of unique CLIENT_INSTANCE_IDs with an empty or non-empty subscription set. | |
ClientMetricsUnknownSubscriptionRequestCount ClientMetricsUnknownSubscriptionRequestRate | Meter | ClientMetrics | Total number/rate of metrics requests for PushTelemetryRequests with unknown subscription id. | |
ClientMetricsThrottleCount ClientMetricsThrottleRate | Meter | ClientMetrics | client_instance_id | Total number/rate of throttled telemetry requests due to a higher requests rate than the allowed PushIntervalMs. |
ClientMetricsPluginExportCount ClientMetricsPluginExportRate | Meter | ClientMetrics | client_instance_id | The total number/rate of metrics requests being pushed to metrics plugins, e.g., the number/rate of exportMetrics() calls. |
ClientMetricsPluginErrorCount ClientMetricsPluginErrorRate | Meter | ClientMetrics | client_instance_id | The total number/rate of exceptions raised from plugin's exportMetrics(). |
ClientMetricsPluginExportTimeAvg ClientMetricsPluginExportTimeMax | Avg and Max | ClientMetrics | client_instance_id | Amount of time broker spends in invoking plugin exportMetrics call |
Security
Since client metric subscriptions are primarily aimed at the infrastructure operator that is managing the Kafka cluster it should be sufficient to limit the config control operations to the CLUSTER resource.
There will be no permission checks on the PushTelemetryRequest itself.
API Request | Resource | ACL operation |
---|---|---|
DescribeConfigs | CLUSTER | DESCRIBE_CONFIGS |
AlterConfigs | CLUSTER | ALTER_CONFIGS |
GetTelemetrySubscriptions | N/A | N/A |
PushTelemetry | N/A | N/A |
Tools
kafka-configs.sh
The kafka-configs.sh
tool can be used to administer client metrics configuration. The kafka-client-metrics.sh
tool is preferred because it is more useable.
A new entity-type
of client-metrics
is added.
For this entity type, the allowed configs are: interval.ms
, metrics
and match
.
Some examples of use of kafka-configs.sh
are shown in the next section for comparison with kafka-client-metrics.sh
.
kafka-client-metrics.sh
A new kafka-client-metrics.sh
tool is added which provides a simpler interface for administering client metrics configuration resources than using kafka-configs.sh
directly.
Here's the command-line syntax summary.
Code Block |
---|
This tool helps to manipulate and describe client metrics configurations.
Option |
Security
Since client metric subscriptions are primarily aimed at the infrastructure operator that is managing the Kafka cluster it should be sufficient to limit the config control operations to the CLUSTER resource.
There will be no permission checks on the PushTelemetryRequest itself.
...
API Request
...
Resource
...
ACL
...
DescribeConfigs
...
CLUSTER
...
DESCRIBE, READ
...
AlterConfigs
...
CLUSTER
...
WRITE
...
PushTelemetry
...
N/A
...
N/A
Tools
kafka-configs.sh
The kafka-configs.sh
tool can be used to administer client metrics subscriptions. The kafka-client-metrics.sh
tool is preferred because it is more useable.
A new entity-type
of client-metrics
is added.
For this entity type, the allowed configs are: interval.ms
, metrics
and match
.
Some examples of use of kafka-configs.sh
are shown in the next section for comparison with kafka-client-metrics.sh
.
kafka-client-metrics.sh
A new kafka-client-metrics.sh
tool is added which provides a simpler interface for administering client metrics subscriptions than using kafka-configs.sh
directly.
Here's the command-line syntax summary.
Code Block |
---|
This tool helps to manipulate and describe client metrics subscriptions Option Description ------ ----------- --alter Alter the configuration of the client metrics subscription. --block Block metrics collection. --bootstrap-server <String: server to The Kafka server to connect to. This connect to> Description ------ is required for describing and ----------- altering broker configs. --command-configalter <String: command Property file containing configs to be config property file> passed to AdminAlter Client.the Thisconfiguration isof usedthe client only with metrics resource. --bootstrap-server option<String: server to REQUIRED: The Kafka server to connect to. connect to> --command-config <String: command Property file containing configs to be config property file> for describing and altering broker passed to Admin Client. --delete configs. Delete the configuration of the client --delete Delete the configuration of the client metrics resource. --describe List configurations for client metrics resources. metrics subscription. --describe--generate-name Generate a UUID to use as the name. --help List configs for the client metrics subscriptions. --generate-name Print usage information. Generate a UUID to use as the name. --interval The metrics push interval in milliseconds. --match Matching selector ‘k1=v1,k2=v2’. The following is a list of valid selector names: client_instance_id client_id client_software_name client_software_version client_source_address client_source_port --metrics Telemetry metric name prefixes ‘m1,m2’. --name <String> Name of client metrics subscription. --help Print usage information. configuration resource. --version Display Kafka version. |
Here are some examples.
List all client metrics
...
configuration resources
Code Block | ||
---|---|---|
| ||
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --describe $ kafka-configs.sh --bootstrap-server $BROKERS --describe --entity-type client-metrics |
Describe a client metrics
...
configuration resources
Code Block | ||
---|---|---|
| ||
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --describe --name METRICSUBMYMETRICS $ kafka-configs.sh --bootstrap-server $BROKERS --describe --entity-type client-metrics --entity-name METRICSUBMYMETRICS |
Create a client metrics
...
configuration resource, generating a unique name
In this example, --generate-name
causes the tool to create a type-4 UUID to use as the client metrics subscription configuration resource name. There is no equivalent in kafka-configs.sh
.
Code Block | ||
---|---|---|
| ||
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --alter --generate-name \ --metrics org.apache.kafka.producer.node.request.latency.,org.apache.kafka.consumer.node.request.latency. \ --interval 60000 |
Create a client metrics
...
configuration resource for all Python v1.2.* clients
Code Block | ||
---|---|---|
| ||
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --alter --name METRICSUBMYMETRIC \ --metrics org.apache.kafka.consumer. \ --interval 60000 \ --match "client_software_name=kafka_python,client_software_version=1\.2\..*" $ kafka-configs.sh --bootstrap-server $BROKERS --alter --entity-type client-metrics --entity-name METRICSUBMYMETRICS \ --add-config "metrics=org.apache.kafka.consumer.,interval.ms=60000,match=[client_software_name=kafka.python,client_software_version=1\.2\..*]" |
Block an existing client metrics subscription from pushing metrics
Blocking pushing metrics for a client metrics subscription is achieved by setting the push interval to 0ms.
Code Block |
---|
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --alter --name METRICSUB --block $ kafka-configs.sh --bootstrap-server $BROKERS --alter --entity-type client-metrics --entity-name METRICSUB \ --add-config "interval.ms=0\..*]" |
Delete a client metrics
...
configuration resource
Deletion of a client metrics subscription configuration resource with kafka-configs.sh
requires listing the names of the configs to delete.
Code Block | ||
---|---|---|
| ||
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --delete --name METRICSUBMYMETRICS $ kafka-configs.sh --bootstrap-server $BROKERS --alter --entity-type client-metrics --entity-name METRICSUBMYMETRICS \ --delete-config metrics,interval.ms,match |
...
Example workflows showcasing how this feature may be used for proactive and reactive purposes.
Proactive monitoring
The Kafka cluster is configured to collect all standard metrics pushed by the client at an interval of 60 seconds, the metrics plugin forwards the collected metrics to an imaginary system that monitors a set of well known metrics and triggers alarms when trends go out of profile.
The monitoring system detects an anomaly for CLIENT_INSTANCE_ID=4e6fb54c-b8e6-4517-889b-e15b99c09c20
which for more than 180 seconds has exceeded the threshold of 5000 milliseconds.
Before sending the alert to the incident management system, the monitoring system collects a set of labels that are associated with this CLIENT_INSTANCE_ID, such as:
- client.id
- client_source_address and client_source_port on broker id X (1 or more such mappings based on how many connections the client has used to push metrics).
- principal
- tenant
- client_software_name and client_software_version
- In case of consumer: group_id, group_instance_id (if configured) and the latest known group_member_id.
- In case of transactional producer: transactional_id
This collection of information, along with the triggered metric, is sent to the incident management system for further investigation or aggregation, and provides enough information to identify which client and where the client is run. Further action might be to contact the organization or team that matches the principal, transactional.id, source address and so on for further investigation.
Reactive monitoring
The Kafka cluster configuration for metrics collection (i.e., metrics subscriptions) is irrelevant to this use-case, given that a metrics plugin is enabled on the brokers. The metrics plugin is configured to write metrics to a topic. A support system with an interactive interface is reading from this metrics topic, and has an Admin client to configure the cluster with desired metrics subscriptions.
The application owner reports a lagging consumer that is not able to keep up with the incoming message rate and asks for the Kafka operator to help troubleshoot. The application owner, who unfortunately does not know the client instance id of the consumer, provides the client.id, userid, and source address.
The Kafka operator adds a metrics subscription for metrics matching prefix "org.apache.kafka.consumer."
and with the client_source_address and client_id as matching selector. Since this is a live troubleshooting case, the metrics push interval is set to a low 10 seconds.
The metrics subscription propagates through configuration change notifications to all brokers which update their local metrics subscription config cache and regenerates the SubscriptionId.
Upon the next PushTelemetryRequest or GetTelemetrySubscriptionsRequest from the clients matching the given source address and client_id, the receiving broker sees that the SubscriptionId no longer matches its metrics subscription cache, the client retrieves the new metrics subscription and schedules its next metrics push to a random value between PushIntervalMs * 0.5 .. PushIntervalMs * 1.5.
Upon the next PushTelemetryRequest, which now includes metrics for the subscribed metrics, the metrics are written to the output topic and the PushIntervalMs is adjusted to the configured interval of 10 seconds. This repeats until the metrics subscription configuration is changed.
Multiple consumers from the same source address and using the same client_id may now be pushing metrics to the cluster. The support system starts receiving the metrics and displays the matching client metrics to the operator. If the operator is able to further narrow down the client instance through other information in the metrics it may alter the metrics subscription to match on that client's client_instance_id. But in either case the metrics matching the given client.id are displayed to the operator.
The operator identifies an increasing trend in client.consumer.processing.time which indicates slow per-message processing in the application and reports this back to the application owner, ruling out the client and Kafka cluster from the problem space.
The operator removes the metrics subscription which causes the next PushTelemetryResponse to return an error indicating that the metrics subscription has changed, causing the client to get its new subscription which is now back to empty.
Future work
Tracing and logging are outside the scope of this proposal but the use of OpenTelemetry allows for those additional two methods within the same semantics and nomenclature outlined by this proposal.
When we add new client metrics to Kafka, we can choose whether to make them standard or required metrics in order to enhance the ability to analyze and troubleshoot client performance.
and reactive purposes.
Proactive monitoring
The Kafka cluster is configured to collect all standard metrics pushed by the client at an interval of 60 seconds, the metrics plugin forwards the collected metrics to an imaginary system that monitors a set of well known metrics and triggers alarms when trends go out of profile.
The monitoring system detects an anomaly for CLIENT_INSTANCE_ID=4e6fb54c-b8e6-4517-889b-e15b99c09c20
which for more than 180 seconds has exceeded the threshold of 5000 milliseconds.
Before sending the alert to the incident management system, the monitoring system collects a set of labels that are associated with this CLIENT_INSTANCE_ID, such as:
- client.id
- client_source_address and client_source_port on broker id X (1 or more such mappings based on how many connections the client has used to push metrics).
- principal
- tenant
- client_software_name and client_software_version
- In case of consumer: group_id, group_instance_id (if configured) and the latest known group_member_id
- In case of transactional producer: transactional_id
This collection of information, along with the triggered metric, is sent to the incident management system for further investigation or aggregation, and provides enough information to identify which client and where the client is run. Further action might be to contact the organization or team that matches the principal, transactional.id, source address and so on for further investigation.
Reactive monitoring
The Kafka cluster configuration for metrics collection (i.e., metrics subscriptions) is irrelevant to this use-case, given that a metrics plugin is enabled on the brokers. The metrics plugin is configured to write metrics to a topic. A support system with an interactive interface is reading from this metrics topic, and has an Admin client to configure the cluster with desired metrics subscriptions.
The application owner reports a lagging consumer that is not able to keep up with the incoming message rate and asks for the Kafka operator to help troubleshoot. The application owner, who unfortunately does not know the client instance id of the consumer, provides the client.id, userid, and source address.
The Kafka operator adds a metrics subscription for metrics matching prefix "org.apache.kafka.consumer."
and with the client_source_address and client_id as matching selector. Since this is a live troubleshooting case, the metrics push interval is set to a low 10 seconds.
The metrics subscription propagates through configuration change notifications to all brokers which update their local metrics subscription config cache and regenerates the SubscriptionId.
Upon the next PushTelemetryRequest or GetTelemetrySubscriptionsRequest from the clients matching the given source address and client_id, the receiving broker sees that the SubscriptionId no longer matches its metrics subscription cache, the client retrieves the new metrics subscription and schedules its next metrics push to a random value between PushIntervalMs * 0.5 .. PushIntervalMs * 1.5.
Upon the next PushTelemetryRequest, which now includes metrics for the subscribed metrics, the metrics are written to the output topic and the PushIntervalMs is adjusted to the configured interval of 10 seconds. This repeats until the metrics subscription configuration is changed.
Multiple consumers from the same source address and using the same client_id may now be pushing metrics to the cluster. The support system starts receiving the metrics and displays the matching client metrics to the operator. If the operator is able to further narrow down the client instance through other information in the metrics it may alter the metrics subscription to match on that client's client_instance_id. But in either case the metrics matching the given client.id are displayed to the operator.
The operator identifies an increasing trend in client.consumer.processing.time which indicates slow per-message processing in the application and reports this back to the application owner, ruling out the client and Kafka cluster from the problem space.
The operator removes the metrics subscription which causes the next PushTelemetryResponse to return an error indicating that the metrics subscription has changed, causing the client to get its new subscription which is now back to empty.
Future work
Tracing and logging are outside the scope of this proposal but the use of OpenTelemetry allows for those additional two methods within the same semantics and nomenclature outlined by this proposal.
When we add new client metrics to Kafka, we can choose whether to make them standard or required metrics in order to enhance the ability to analyze and troubleshoot client performance.
We could also add proper support for histogram metrics in the client, and this would nicely fit with histograms in OpenTelemetry metrics.
In network environments where there are network proxies (such as Kubernetes ingress) on the path between the client and broker, it may be problematic obtaining the originating client's IP address. One way to address this in the future would be to support the PROXY protocol in Kafka.
Configuration properties or extensions to the Metrics plugin interface on the broker to change the temporality is outside the scope of this KIP and may be addressed at a later time as the need arises. For example, if a metrics back-end has a preference for a particular temporality, it may be helpful to let it indicate that using the Metrics plugin interface so that the broker can use this temporality when requesting metrics from the clientsWe could also add proper support for histogram metrics in the client, and this would nicely fit with histograms in OpenTelemetry metrics.
Compatibility, Deprecation, and Migration Plan
...