Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Resolving discussion comments

...

  • GetTelemetrySubscriptions RPC - protocol request used by the client to acquire its initial Client instance ID and to continually retrieve updated metrics subscriptions.
  • PushTelemetry RPC - protocol request used by the client to push metrics to any broker it is connected to.
  • Standard and required metrics - a set of standardized metrics that all supporting clients should provide.
  • AdminAPI config - the AdminAPI configuration interface with a new CLIENT_METRICS  resource type is used to manage metrics subscriptions.
  • Client metrics plugin / extending the MetricsReporter interface - a broker plugin interface that performs something meaningful with the metrics. This plugin will typically forward the metrics to a time-series database. It is recommended that any broker-side processing is kept to a minimum for scalability and performance reasons.

...

  • A list of metric name prefixes (in the telemetry metric name format described later)
  • The push interval which specifies the frequency of pushing the metrics from the client
  • Matching criteria which can be used to filter which clients match the subscription, such as client version

Client identification

...

and the client instance id

The value of metrics collection is limited if the metrics can’t be tied to an entity, in this case a client instance. While aggregate metrics do provide some value in trends and superficial monitoring, for metrics to be truly useful, there needs to be a way to identify a particular client instance.

...

  • Client instance - a single instance of a producer, consumer, Admin client, etc.
  • Application process instance - a single application runtime, which may contain multiple client instances such as a Kafka Streams application with a mix of Admin, producer and consumer instances.

This proposal suggests that as As part of the initial metrics handshake, the broker generates a unique client instance id UUID4 and  and returns it to the client. The client must use the returned client instance id for the remaining lifetime of the client instance, regardless of which broker it communicates with. The client instance id is therefore a temporary, unique, generated identifier for identifying a client instance for the purpose of metrics collection.

The Kafka Java client should provide provides an API for the application to read the generated client instance id as to assist in mapping and identification of the client based on the collected metrics. As an alternative or complement the client may log its Other clients implementing this proposal could provide an equivalent interface or use an alternative such as logging the client instance id as soon as it has acquired it from the cluster. This allows live and post-mortem correlation between collected metrics and a client instance.

...

Kafka Streams applications have an application.id configured configured and this identity identifier should be included as the application_id  metrics label.

...

There are many more Kafka client metrics beyond the standard and required metrics, some of which are quite specific to the way that the Java client works. Any of these other metrics could conceivably be included in client metrics subscriptions, but there's no expectation that non-Java clients would implement them.

Metric types

The OTLP metric data point types in the following tables correspond to the OpenTelemetry v1 metrics protobuf message types. A short summary:

...

All standard telemetry metric names begin with the prefix "org.apache.kafka.". This is omitted from the table for brevity. The required metrics are bold.

Telemetry metric nameType

OTLP metric data point type

Labels

Description

Existing Kafka metric name

producer.connection.creation.rate 

Gauge


The rate of connections established per second.

“connection-creation-rate”, group=”producer-metrics”

producer.connection.creation.total 

Sum


The total number of connections established.

“connection-creation-total”, group=”producer-metrics”

producer.node.request.latency.avg 

Gauge

node_id

The average request latency in ms for a node.

“request-latency-avg”, group=”producer-node-metrics”

producer.node.request.latency.max 

Gauge

node_id

The maximum request latency in ms for a node.

“request-latency-max”, group=”producer-node-metrics”

producer.produce.throttle.time.avg 

Gauge


The average time in ms a request was throttled by the broker.

“produce-throttle-time-avg”, group=“producer-metrics”

producer.produce.throttle.time.max 

Gauge


The maximum time in ms a request was throttled by the broker.

“produce-throttle-time-max”, group=“producer-metrics”

producer.record.queue.time.avg 

Gauge


The average time in ms record batches spent in the send buffer.

“record-queue-time-avg”, group=“producer-metrics”

producer.record.queue.time.max 

Gauge


The maximum time in ms record batches spent in the send buffer.

“record-queue-time-max”, group=“producer-metrics”

...

All standard telemetry metric names begin with the prefix "org.apache.kafka.". This is omitted from the table for brevity. The required metrics are bold.

Telemetry metric nameType

OTLP metric data point type

Labels

Description

Existing metric name

consumer.connection.creation.rate 

Gauge


The rate of connections established per second.

“connection-creation-rate”, group= “consumer-metrics”

consumer.connection.creation.total 

Sum


The total number of connections established.

“connection-creation-total”, group=”consumer-metrics”

consumer.node.request.latency.avg 

Gauge

node_id

The average request latency in ms for a node.

“request-latency-avg”, group= “consumer-node-metrics”

consumer.node.request.latency.max 

Gauge

node_id

The maximum request latency in ms for a node.

“request-latency-max”, group=“consumer-node-metrics”

consumer.poll.idle.ratio.avg 

Gauge


The average fraction of time the consumer’s poll() is idle as opposed to waiting for the user code to process records.

“poll-idle-ratio-avg”, group=“consumer-metrics”

consumer.coordinator.commit.latency.avg 

Gauge


The average time taken for a commit request.

“commit-latency-avg”, group=“consumer-coordinator-metrics”

consumer.coordinator.commit.latency.max 

Gauge


The maximum time taken for a commit request.

“commit-latency-max”, group=“consumer-coordinator-metrics”

consumer.coordinator.assigned.partitions 

Gauge


The number of partitions currently assigned to this consumer.

“assigned-partitions”, group=“consumer-coordinator-metrics”

consumer.coordinator.rebalance.latency.avg 

Gauge


The average time taken for group rebalance.

“rebalance-latency-avg”, group=“consumer-coordinator-metrics”

consumer.coordinator.rebalance.latency.max 

Gauge


The maximum time taken for a group rebalance.

“rebalance-latency-max”, group=“consumer-coordinator-metrics”

consumer.coordinator.rebalance.latency.total 

Sum


The total time taken for group rebalances.

“rebalance-latency-total”, group=“consumer-coordinator-metrics”

consumer.fetch.manager.fetch.latency.avg 

Gauge


The average time taken for a fetch request.

“fetch-latency-avg”, group=“consumer-fetch-manager-metrics”

consumer.fetch.manager.fetch.latency.max 

Gauge


The maximum time taken for a fetch request.

“fetch-latency-max”, group=“consumer-fetch-manager-metrics”

...

Label name

Description

application_idapplication.id  (Kafka Streams only)

client_rack

client.rack (if configured)

group_id

group.id (consumer)

group_instance_id

group.instance.id (consumer)

group_member_id

Group member id (if any, consumer)

transactional_id

transactional.id (producer)

Broker-added labels

...

Label name

Description

client_instance_id

The generated CLIENT_INSTANCE_ID.
client_idclient.id as reported in the Kafka protocol header.

client_software_name

The client’s implementation name as reported in ApiVersionRequest.

client_software_version

The client’s version as reported in ApiVersionRequest.

client_source_address

The client connection’s source address.

client_source_port

The client connection’s source port.

principal

Client’s security principal. Content depends on authentication method.

broker_id

Receiving broker’s node-id.

...

A client that supports this metric interface and identifies a supporting broker (through detecting at least GetTelemetrySubscriptionsRequestV0 in the ApiVersionResponse) will start off by sending a GetTelemetrySubscriptionsRequest with the ClientInstanceId field set to Null to one randomly selected connected broker to gather its client instance id, the subscribed metrics, the push interval, accepted compression types, etc. This handshake with a Null ClientInstanceId is only performed once for a client instance's lifetime. Sub-sequent Subsequent GetTelemetrySubscriptionsRequests must include the ClientInstanceId returned in the first response, regardless of broker.

Upon receiving the GetTelemetrySubscriptionsResponse, the client shall update its internal metrics collection to match the received subscription (absolute update) and update its push interval timer according to the returned received PushIntervalMs. The first metrics push should be randomized between 0.5 * PushIntervalMs and 1.5 * PushIntervalMs, this . This is to ensure that not all clients start pushing metrics at the same time after a cluster comes back up after some downtime.

If GetTelemetrySubscriptionsResponse.RequestedMetrics indicates that no metrics are desired (RequestedMetrics is Null), the client should send a new GetTelemetrySubscriptionsResponse GetTelemetrySubscriptionsRequest after the PushIntervalMs has expired.

If GetTelemetrySubscriptionsResponse.RequestedMetrics is non-empty but does not match any metrics the client provides, then the client should PushTelemetryRequests send a PushTelemetryRequest at the indicated PushIntervalMs interval with an empty metrics blob. This is needed so that a broker side metrics plugin can differentiate between non-responsive or buggy clients and clients that don't have metrics matching the subscription set.

...

Client termination

When a client with an active metrics subscription is being shut down, it should send its final metrics regardless of without waiting for the PushIntervalMs time, and only if the client has an active metrics subscription.

To avoid the receiving broker’s metrics rate-limiter to discard discarding this out-of-profile push, the PushTelemetryRequest.Terminating  field must be set to true. A broker must only allow one such consecutive digression and otherwise throttle the client as if this field was not setunthrottled metrics push for each combination of client ID and SubscriptionId.

In the event that the client's metric subscription has changed and the final metrics push fails with error code UNKNOWN_SUBSCRIPTION_ID , the terminating client can choose to obtain a new subscription ID by sending a GetTelemetrySubscriptionsRequest and then immediately sending a PushTelemetryRequest with the Terminating flag set to true, or it can choose to abandon sending a final metrics push.

The metrics should contain the reason for the client termination by including the client.terminating metric with the label “reason” set to a human readable explanation why the client is being shut down, such as “Fatal error: Static consumer fenced by newer instance”, or “Consumer closed by application”.

...

Error code

Reason

Client action

InvalidRecord INVALID_RECORD  (87)

Broker failed to decode or validate the client’s encoded metrics.

Log a warning to the application and schedule the next GetTelemetrySubscriptionsRequest to 5 minutes.

UnknownSubscriptionId UNKNOWN_SUBSCRIPTION_ID (NEW)Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId, the . The configured subscriptions have changed.Send a GetTelemetrySubscriptionRequest to update the client's subscriptions and get a new SubscriptionId.

UnsupportedCompressionType UNSUPPORTED_COMPRESSION_TYPE  (76)

Client’s compression type is not supported by the broker.

Send a GetTelemetrySubscriptionRequest to get an up-to-date list of the broker's supported compression types (and any subscription changes).

...

Retries should preferably be attempted on the same broker connection, in particular for UnknownSubscriptionId UNKNOWN_SUBSCRIPTION_ID , but another broker connection may be utilized at the discretion of the client.

How error and warnings are propagated to the application is client and language specific, simply . Simply logging the error is sufficient.

...

If there is no client metrics receiver plugin configured on the broker, it will respond to GetTelemetrySubscriptionsRequests GetTelemetrySubscriptionsRequest with RequestedMetrics set to Null and a -1 SubscriptionId. The client should  should send a new GetTelemetrySubscriptionsRequest after the PushIntervalMs has expired. This allows the metrics receiver to be enabled or disabled without having to restart the broker or reset the client connection.

...

  • metrics  - a comma-separated list of telemetry metric name prefixes, e.g., "org.apache.kafka.producer.node.request.latency., org.apache.kafka.consumer.coordinator.rebalance.latency.max". Whitespace is ignored.
  • interval.ms  - metrics push interval in milliseconds. Defaults to 5 minutes if not specified.
  • match  - Client matching selector that is evaluated as a list of an anchored regular expressions (i.e., "something.*" is treated as "^something.*$"). Any client that matches all of the selectors will be eligible for this metrics subscription. The regular expressions are compiled and executed using Google RE2/J. Initially supported selectors are:
    • client_instance_id - CLIENT_INSTANCE_ID UUID string representation.
    • client_id  - client's reported client.id in the GetTelemetrySubscriptionsRequest.
    • client_software_name  - client software implementation name.
    • client_software_version  - client software implementation version.
    • client_source_address  - client connection's source address from the broker's point of view.
    • client_source_port  - client connection's source port from the broker's point of view.

...

Code Block
GetTelemetrySubscriptionsRequestV0 {
 	ClientInstanceId uuid    			            // UUID4 unique for this client instance.
															// Must be set to Null on the first request, and to the
										// returned ClientInstanceId  // Must be set to Null on the first request, and to the returned ClientInstanceId
                                         // from the first response
										// for all subsequent requests to any broker.
}

GetTelemetrySubscriptionsResponseV0 {
 	ThrottleTime int32					 // Standard throttling.
	ErrorCode int16						 // Error code.
    ClientInstanceId uuid                // Assigned client instance id if ClientInstanceId was Null in the request, else Null.
    SubscriptionId int32                 // Unique identifier for the current subscription set for this client instance.
    AcceptedCompressionTypes Array[int8] // The compression types the broker accepts for PushTelemetryRequest.CompressionType
                                         // as listed in MessageHeaderV2.Attributes.CompressionType. The array will be sorted in
                                         // preference order from higher to lower. The CompressionType of NONE will not be
                                         // present in the response from the broker, though the broker does support uncompressed
                                         // client telemetry if none of the accepted compression codecs are supported by the client.
    PushIntervalMs int32				 // Configured push interval, which is the lowest configured interval in the current subscription set.
    DeltaTemporality bool                // If True; monotonic/counter metrics are to be emitted as deltas to the previous sample.
                                         // If False; monotonic/counter metrics are to be emitted as cumulative absolute values.
	RequestedMetrics Array[string]		 // Requested telemetry metrics prefix string match.
										 // Empty array: No metrics subscribed.
										 // Array[0] empty string: All metrics subscribed.
										 // Array[..]: prefix string match.
}

PushTelemetryRequestV0 {
	ClientInstanceId uuid               			 // UUID4 unique for this client instance, as retrieved in the first GetTelemetrySubscriptionsRequest.
    SubscriptionId int32                 // SubscriptionId from the GetTelemetrySubscriptionsResponse for the collected metrics.
	Terminating bool					 from the GetTelemetrySubscriptionsResponse for the collected metrics.
	Terminating bool                     // Client is terminating.
    CompressionType int8                 // Compression codec used for .Metrics (ZSTD, LZ4, Snappy, GZIP, None).
                                         // Same values as that of the current MessageHeaderV2.Attributes.
	Metrics binary						.                      // Metrics encoded in OpenTelemetry MetricsData v1 protobuf format.
}

PushTelemetryResponseV0 {
	ThrottleTime int32					                   // Standard throttling.
	ErrorCode int16						                      // Error code.
}

Metrics serialization format

...

The SubscriptionId is a unique identifier for a client instance's subscription set, the id is generated by calculating a CRC32 of the configured metrics subscriptions matching the given client including the PushIntervalMs, XORed with the ClientInstanceId. This SubscriptionId is returned to the client in the GetTelemetrySubscriptionsResponse and the client passes that SubscriptionId in each subsequent PushTelemetryRequest for those received metrics. If the configured subscriptions are updated and resulting in a change for a client instance, the SubscriptionId is recalculated. Upon the next PushTelemetryRequest using the previous SubscriptionId, the broker will find that the received and expected SubscriptionIds differ and it will return UnknownSubscriptionId UNKNOWN_SUBSCRIPTION_ID  back to the client. When a client receives this error code it will immediately send a GetTelemetrySubscriptionsRequest to retrieve the new subscription set along with a new SubscriptionId.

...

If the client has not yet requested a client instance id from the broker, this call will may block up to the duration of the timeout. If no client instance id can be retrieved within the timeout, an error is returned (such as timeout, feature not supported by broker, auth failure).

...

ConfigurationDescriptionValues
metrics A list of telemetry metric name prefixes which specify the metrics of interest.

An empty list means no metrics subscribed.

A list containing just an empty string means all metrics subscribed.

Otherwise, the list entries are prefix-matched against the metric names.

interval.ms The client metrics push interval in milliseconds.Default: 30000 (5 minutes)
match The match criteria for selecting which clients the subscription matches. If a client matches all of these criteria, the client matches the subscription.

A list of key-value pairs.

The valid keys are:

  • client_instance_id - CLIENT_INSTANCE_ID UUID string representation.
  • client_id  - client's reported client.id in the GetTelemetrySubscriptionsRequest.
  • client_software_name  - client software implementation name.
  • client_software_version  - client software implementation version.
  • client_source_address  - client connection's source address from the broker's point of view.
  • client_source_port  - client connection's source port from the broker's point of view.

The values are anchored regular expressions.

New error codes

UnknownSubscriptionIdUNKNOWN_SUBSCRIPTION_ID  - Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId. The configured subscriptions have changed.

...