Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For metrics to be collected, a MetricsPlugin  which implements the ClientTelemetry  interface must be configured on the brokers, and at least one metrics subscription must be configured through the Admin API. Only then will metrics subscriptions be propagated to clients, and only then will clients push metrics to the broker. It is thus up to the cluster operator to explicitly enable client metrics collection.

Because this KIP includes a new kind of ConfigResource, this KIP is only supported for KRaft clusters. This avoids doing the work to store these new resources in ZooKeeper, given that ZooKeeper support is already deprecated.

Client metrics subscription

...

  • KafkaAdminClient: the "admin client runnable" thread
  • KafkaConsumer: (existing code): both the "background" thread (based on the consumer threading refactor which is underway)heartbeat" and the application threads
  • KafkaConsumer: (consumer threading refactor code as a part of KIP-848 effort): the "background" threads
  • KafkaProducer: the "sender" thread

...

The following error codes can be returned in PushTelemetryResponse .

Error code

Reason

Client action

INVALID_REQUEST  (42)

Client sent a PushTelemetryRequest when it has already sent a Terminating request.

Log an error and stop pushing metrics. This is viewed as a problem in the client implementation of metrics serialization that is not likely to be resolved by retrying.

INVALID_RECORD  (87)

Broker failed to decode or validate the client’s encoded metrics.

Log an error and stop pushing metrics. This is viewed as a problem in the client implementation of metrics serialization that is not likely to be resolved by retrying.

TELEMETRY_TOO_LARGE  (NEW)

Client sent a PushTelemetryRequest larger than the maximum size the broker will accept.

Reduce the size of the metrics payload so its size does not exceed GetTelemetrySubscriptionResponse.TelemetryMaxBytes.

UNKNOWN_SUBSCRIPTION_ID (NEW)Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId. The configured subscriptions have changed.Immediately send a GetTelemetrySubscriptionRequest to update the client's subscriptions and get a new SubscriptionId.

UNSUPPORTED_COMPRESSION_TYPE  (76)

Client’s compression type is not supported by the broker.

Immediately send a GetTelemetrySubscriptionRequest to get an up-to-date list of the broker's supported compression types (and any subscription changes).

...

Public Interfaces

Kafka Protocol Changes

These new RPCs are only supported on KRaft clusters.

Code Block
GetTelemetrySubscriptionsRequestV0 {
 	ClientInstanceId uuid                // UUID4 unique for this client instance.
										 // Must be set to Null on the first request, and to the returned ClientInstanceId
                                         // from the first response for all subsequent requests to any broker.
}

GetTelemetrySubscriptionsResponseV0 {
 	ThrottleTime int32					 // Standard throttling.
	ErrorCode int16						 // Error code.
    ClientInstanceId uuid                // Assigned client instance id if ClientInstanceId was Null in the request, else Null.
    SubscriptionId int32                 // Unique identifier for the current subscription set for this client instance.
    AcceptedCompressionTypes Array[int8] // The compression types the broker accepts for PushTelemetryRequest.CompressionType
                                         // as listed in MessageHeaderV2.Attributes.CompressionType. The array will be sorted in
                                         // preference order from higher to lower. The CompressionType of NONE will not be
                                         // present in the response from the broker, though the broker does support uncompressed
                                         // client telemetry if none of the accepted compression codecs are supported by the client.
    PushIntervalMs int32                 // Configured push interval, which is the lowest configured interval in the current subscription set.
    TelemetryMaxBytes int32              // The maximum bytes of binary data the broker accepts in PushTelemetryRequest.
    DeltaTemporality bool                // If True; monotonic/counter metrics are to be emitted as deltas to the previous sample.
                                         // If False; monotonic/counter metrics are to be emitted as cumulative absolute values.
	RequestedMetrics Array[string]		 // Requested telemetry metrics prefix string match.
										 // Empty array: No metrics subscribed.
										 // Array[0] empty string: All metrics subscribed.
										 // Array[..]: prefix string match.
}

PushTelemetryRequestV0 {
	ClientInstanceId uuid                // UUID4 unique for this client instance, as retrieved in the first GetTelemetrySubscriptionsRequest.
    SubscriptionId int32                 // SubscriptionId from the GetTelemetrySubscriptionsResponse for the collected metrics.
	Terminating bool                     // Client is terminating.
    CompressionType int8                 // Compression codec used for .Metrics (ZSTD, LZ4, Snappy, GZIP, None).
                                         // Same values as that of the current MessageHeaderV2.Attributes.
	Metrics binary                       // Metrics encoded in OpenTelemetry MetricsData v1 protobuf format.
}

PushTelemetryResponseV0 {
	ThrottleTime int32                   // Standard throttling.
	ErrorCode int16                      // Error code.
}

...