Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Resolved some minor discussion comments.

...

Error code

Reason

Client action

INVALID_RECORD  (87)

Broker failed to decode or validate the client’s encoded metrics.

Log an error and stop pushing metrics. This is viewed as a problem in the client implementation of metrics serialization that is not likely to be resolved by retrying.

TELEMETRY_TOO_LARGE  (NEW)

Client sent a PushTelemetryRequest larger than the maximum size the broker will accept.

Reduce the size of the metrics payload so its size does not exceed GetTelemetrySubscriptionResponse.TelemetryMaxBytes.

UNKNOWN_SUBSCRIPTION_ID (NEW)Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId. The configured subscriptions have changed.Immediately send a GetTelemetrySubscriptionRequest to update the client's subscriptions and get a new SubscriptionId.

UNSUPPORTED_COMPRESSION_TYPE  (76)

Client’s compression type is not supported by the broker.

Immediately send a GetTelemetrySubscriptionRequest to get an up-to-date list of the broker's supported compression types (and any subscription changes).

...

Code Block
GetTelemetrySubscriptionsRequestV0 {
 	ClientInstanceId uuid                // UUID4 unique for this client instance.
										 // Must be set to Null on the first request, and to the returned ClientInstanceId
                                         // from the first response for all subsequent requests to any broker.
}

GetTelemetrySubscriptionsResponseV0 {
 	ThrottleTime int32					 // Standard throttling.
	ErrorCode int16						 // Error code.
    ClientInstanceId uuid                // Assigned client instance id if ClientInstanceId was Null in the request, else Null.
    SubscriptionId int32                 // Unique identifier for the current subscription set for this client instance.
    AcceptedCompressionTypes Array[int8] // The compression types the broker accepts for PushTelemetryRequest.CompressionType
                                         // as listed in MessageHeaderV2.Attributes.CompressionType. The array will be sorted in
                                         // preference order from higher to lower. The CompressionType of NONE will not be
                                         // present in the response from the broker, though the broker does support uncompressed
                                         // client telemetry if none of the accepted compression codecs are supported by the client.
    PushIntervalMs int32				                 // Configured push interval, which is the lowest configured interval in the current subscription set.
    TelemetryMaxBytes int32              // The maximum bytes of binary data the broker accepts in PushTelemetryRequest.
    DeltaTemporality bool                // If True; monotonic/counter metrics are to be emitted as deltas to the previous sample.
                                         // If False; monotonic/counter metrics are to be emitted as cumulative absolute values.
	RequestedMetrics Array[string]		 // Requested telemetry metrics prefix string match.
										 // Empty array: No metrics subscribed.
										 // Array[0] empty string: All metrics subscribed.
										 // Array[..]: prefix string match.
}

PushTelemetryRequestV0 {
	ClientInstanceId uuid                // UUID4 unique for this client instance, as retrieved in the first GetTelemetrySubscriptionsRequest.
    SubscriptionId int32                 // SubscriptionId from the GetTelemetrySubscriptionsResponse for the collected metrics.
	Terminating bool                     // Client is terminating.
    CompressionType int8                 // Compression codec used for .Metrics (ZSTD, LZ4, Snappy, GZIP, None).
                                         // Same values as that of the current MessageHeaderV2.Attributes.
	Metrics binary                       // Metrics encoded in OpenTelemetry MetricsData v1 protobuf format.
}

PushTelemetryResponseV0 {
	ThrottleTime int32                   // Standard throttling.
	ErrorCode int16                      // Error code.
}

...

This also allows a plugin to reuse the existing labels passed through the MetricsContext (e.g. broker, cluster id, and configured labels) and add them to the OpenTelemetry resource labels as needed.

The broker only reports support for GetTelemetrySubscriptions and PushTelemetry requests in its ApiVersionResponse if it has a MetricsReporter  that implements the ClientTelemetry interface. This means that clients will not attempt to push metrics to brokers that are not capable of receiving them.


Code Block
/**
 * A {@link MetricsReporter} may implement this interface to indicate support for collecting client telemetry on the server side
 */
@InterfaceStability.Evolving
public interface ClientTelemetry {

    /**
     * Called by the broker to create a ClientTelemetryReceiver instance.
     * This instance may be cached by the broker.
     *
     * This method will always be called after the initial call to
     * {@link MetricsReporter#contextChange(MetricsContext)}
     * on the MetricsReporter implementing this interface.
     *
     * @return
     */
    ClientTelemetryReceiver clientReceiver();
}

@InterfaceStability.Evolving
public interface ClientTelemetryReceiver {
    /**
     * Called by the broker when a client reports telemetry. The associated request context can be used
     * by the plugin to retrieve additional client information such as client ids or endpoints.
     *
     * This method may be called from the request handling thread, and as such should avoid blocking.
     *
     * @param context the client request context for the corresponding PushTelemetryRequest api call
     * @param payload the encoded telemetry payload as sent by the client
     */
    void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload);
}

@InterfaceStability.Evolving
public interface ClientTelemetryPayload {

    /**
     * Client's instance id.
     */
    Uuid clientInstanceId();

    /**
     * Indicates whether the client is terminating and sending its last metrics push.
     */
    boolean isTerminating();

    /**
     * Metrics data content-type / serialization format.
     * Currently "application/x-protobuf;type=otlp+metrics0.19"
     */
    String contentType();

    /**
     * Serialized uncompressed metrics data.
     */
    ByteBuffer data();
}

...

If the client has not yet requested a client instance id from the broker, this call may block up to the duration of the timeout. If no client instance id can be retrieved within the timeout, an error is returned (such as timeout, feature not supported by broker, auth failure).retrieved within the timeout, an error is returned (such as timeout, feature not supported by broker, auth failure).

Broker configuration

ConfigurationDescriptionValues
telemetry.max.bytes The maximum size (after compression if compression is used) of telemetry pushed from a client to the broker.int, default: 1048576, valid values: [1,...]

Client configuration

This applies to producers, consumers, admin client, and of course embedded uses of these clients in frameworks such as Kafka Connect.

...

ConfigurationDescriptionValues
metrics A list of telemetry metric name prefixes which specify the metrics of interest.

An empty list means no metrics subscribed.

A list containing just an empty string means all metrics subscribed.

Otherwise, the list entries are prefix-matched against the metric names.

interval.ms The client metrics push interval in milliseconds.Default: 300000 (5 minutes), minimum: 100, maximum: 3600000 (1 hour)
match The match criteria for selecting which clients the subscription matches. If a client matches all of these criteria, the client matches the subscription.

A list of key-value pairs.

The valid keys are:

  • client_instance_id - CLIENT_INSTANCE_ID UUID string representation.
  • client_id  - client's reported client.id in the GetTelemetrySubscriptionsRequest.
  • client_software_name  - client software implementation name.
  • client_software_version  - client software implementation version.
  • client_source_address  - client connection's source address from the broker's point of view.
  • client_source_port  - client connection's source port from the broker's point of view.

The values are anchored regular expressions.

New error codes

TELEMETRY_TOO_LARGE  - Client sent a PushTelemetryRequest with a payload that was too large.

UNKNOWN_SUBSCRIPTION_ID  - Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId. The configured subscriptions have changed.

...

We could also add proper support for histogram metrics in the client, and this would nicely fit with histograms in OpenTelemetry metrics.

In network environments where there are network proxies (such as Kubernetes ingress) on the path between the client and broker, it may be problematic obtaining the originating client's IP address. One way to address this in the future would be to support the PROXY protocol in Kafka.

Compatibility, Deprecation, and Migration Plan

...