Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • New PushTelemetryRequest protocol request type.
  • Updated metrics receiver interface in the broker to accustom for the new semantics and format.
  • A new CLIENT_METRICS ConfigEntry resource type for configuring metrics subscriptions.
  • kafka-client-metrics.sh CLI script to configure and list client metrics subscriptions.

Protocol

Code Block
PushTelemetryRequestGetTelemetrySubscriptionsRequestV0 {
 	ClientInstanceId stringuuid    			// client.id + “-” + UUID4 unique for this client instance.
										// Must be set to Null on the first request, and to the
										// returned ClientInstanceId from the first response
										// for all subsequent requests to any broker.
}

GetTelemetrySubscriptionsResponseV0 {
 	TerminatingThrottleTime boolint32					// Client is terminating.Standard throttling
	ContentTypeErrorCode stringint16						// “application/x-protobuf;type=otlp+metrics09” or “application/x-protobuf+zstd;type=otlp+metrics09”
	Metrics binary						// Format specified by ContentType, possibly compressed.
}

PushTelemetryResponse {
	ThrottleTime int32					// Standard and metric-specific throttling
	ErrorCode int16						// Error code
	ClientInstanceId string				// Will be set to a generated unique id if the
										// request ClientInstanceId was Null, else
										// this field will be set to Null.
	AcceptedContentTypes Array[string]	// Accepted metric formats.
Error code
    SubscriptionId int32                // Unique identifier for the current subscription set for this client instance.
    AcceptedCompressionTypes Array[int8] // The compression types the broker accepts for PushTelemetryRequest.CompressionType.
                                        // Calculated as a bitmask of (1 << MessageHeaderV2.Attributes.CompressionType).
 	PushIntervalMs int32				// OnlyConfigured returnedpush if Request.ContentTypeinterval, which is null
								      	// or empty, or ErrorCode is set to
								      	// UnsupportedFormat(?)the lowest configured interval in the current subscription set.
	RequestedMetrics Array[string]		// Requested Metrics prefix string match.
										// AlsoEmpty includesarray: theNo supportedmetrics compression typessubscribed.
										// Ordered by preference (most suitable type first).
	NextPushMs int32Array[0] empty string: All metrics subscribed.
										// Delay until next PushTelemetry
	RequestedMetrics Array[string]Array[..]: prefix string match

}

PushTelemetryRequestV0 {
	ClientInstanceId uuid    			// UUID4 Requestedunique Metricsfor prefixthis stringclient matchinstance.
										// Empty array: No metrics subscribed.Must be set to Null on the first request, and to the
										// Array[0] empty string: All metrics subscribed.returned ClientInstanceId from the first response
										// Array[..]: prefix string match
}

for all subsequent requests to any broker.
    SubscriptionId int32                // SubscriptionId from the GetTelemetrySubscriptionsResponse for the collected metrics.
	Terminating bool					// Client is terminating.
    CompressionType int8                // Compression codec used for .Metrics (ZSTD, LZ4, Snappy, GZIP, None).
                                        // Same values as that of the current MessageHeaderV2.Attributes.
	Metrics binary						// Format specified by ContentType, possibly compressed.
}

PushTelemetryResponseV0 {
	ThrottleTime int32					// Standard and metric-specific throttling
	ErrorCode int16						// Error code
}


Metrics format

The metrics format for PushTelemetryRequestV0 is OpenTelemetry Protobuf protocol definitions version 0.10.

Future versions of PushTelemetryRequest and GetTelemetrySubscriptionsRequest may include a content-type field to allow for updated OTLP format versions (or additional formats), but this field is currently not included since only one format is specified by this proposal.


SubscriptionId

The SubscriptionId is a unique identifier for a client instance's subscription set, the id is generated by calculating a CRC32 of the configured metrics subscriptions matching the given client including the NextPushMs, XORed with the ClientInstanceId. This SubscriptionId is returned to the client in the GetTelemetrySubscriptionsResponse and the client passes that SubscriptionId in each subsequent PushTelemetryRequest for those received metrics. If the configured subscriptions are updated and resulting in a change for a client instance, the SubscriptionId is recalculated. Upon the next PushTelemetryRequest using the previous SubscriptionId, the broker will find that the received and expected SubscriptionIds differ and it will return UnknownSubscriptionId back to the client. When a client receives this error code it will immediately send a GetTelemetrySubscriptionsRequest to retrieve the new subcription set along with a new SubscriptionId.

This mechanism provides a way for the broker to propagate an updated subscription set to the client and is similar to the use of Epochs in other parts of the protocol, but simplified in that no state needs to be persisted - the configured subscription set + client instance id is the identifier itselfThe ContentType in the client's PushTelemetryRequest must include a type parameter describing the data format, in this case otlp+metrics09 for OpenTelemetry Protobuf version 0.9. This is needed since OTLP is not yet at v1 and may thus have backwards-incompatible changes.


Broker metrics receiver

The broker will expose a plugin interface for client telemetry to be forwarded to or collected by external systems. In particular, since we already collect data in OpenTelemetry format, one goal is to make it possible for a plugin to forward metrics directly to an OpenTelemetry collector.

...

Code Block
/**
 * A {@link MetricsReporter} may implement this interface to indicate support for collecting client telemetry on the server side
 */
@InterfaceStability.Evolving
public interface ClientTelemetry {

    /**
     * Called by the broker to create a ClientTelemetryReceiver instance.
     * This instance may be cached by the broker.
     *
     * This method will always be called after the initial call to
     * {@link MetricsReporter#contextChange(MetricsContext)}
     * on the MetricsReporter implementing this interface.
     *
     * @return
     */
    ClientTelemetryReceiver clientReceiver();
}

@InterfaceStability.Evolving
public interface ClientTelemetryReceiver {
    /**
     * Called by the broker when a client reports telemetry. The associated request context can be used
     * by the plugin to retrieve additional client information such as client ids or endpoints.
     *
     * This method may be called from the request handling thread, and as such should avoid blocking.
     *
     * @param context the client request context for the corresponding PushTelemetryRequest api call
     * @param payload the encoded telemetry payload as sent by the client
     */
    void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload);

    /**
     * Returns the list of accepted content types for client metric payloads supported by this receiver
     * This method may be used by the broker to inform connecting clients of supported payload formats.
     *
     * <p>e.g.
     * <code>application/x-protobuf+zstd;type=otlp+metrics09<metrics010</code>,
     * </p>
     *
     * @return collection of supported mime types
     */
    Collection<String> acceptedContentTypes();
}

@InterfaceStability.Evolving
public interface ClientTelemetryPayload {

    String clientInstanceId();

    String contentType();

    ByteBuffer data();
}

...

This proposal suggests that as part of the initial metrics handshake the broker generates a unique client instance id based on the client. id and a UUID4 hex string and returns it to the client. The client must use the returned client instance id for the remaining lifetime of the client instance.Prefixing the client instance id with the client.id allows the metrics configuration interface to seamlessly make matching on just the client.id (possibly matching multiple clients) as well as the full client instance id (matching just one single client instance)the returned client instance id for the remaining lifetime of the client instance regardless of which broker it communicates with.

The client should provide an API for the application to read the generated client instance id as to assist in mapping and identification of the client based on the collected metrics. As an alternative or complement the client may log its client instance id upon receiving its first non-empty PushTelemetryRequestas soon as it has acquired it from the cluster. This allows live and post-mortem correlation between collected metrics and a client instance.

...

While the protocol-level request will allow for other metrics formats, Kafka will only ship with OpenTelemetry and it is required for a client that claims support of this KIP to use OpenTelemetry by default as wellthe OpenTelemetry v0.10 protobuf format.

Metric payloads are encoded as OpenTelemetry ExportMetricsServiceRequest protobuf objects.

...

client_software_name=confluent-kafka-python
client_software_version=v2.1.3
client_instance_id=someClientId-B64CD139-3975-440A-91D4
idempotence=true
transactional_id=someTxnApp

...

As metric names and labels are highly repeated it is recommended that the serialized metrics are compressed prior to sending them to the broker if the serialized uncompressed size exceeds 1000 bytes.

The PushTelemtryRequest.ContentTypeCompressionType must then be set to application/x-protobuf+zstd;type=otlp+metrics09 (or any of the other Kafka-supported compression codecs: gzip, snappy, lz4, zstd)the corresponding compression type value as defined for MessageV2.

Tests indicate that a compression ratio up to 10x is possible for the standard metrics.

...

A client that supports this metric interface and identifies a supportive broker (through detecting at least PushTelemetryRequestV0 GetTelemetrySubscriptionsRequestV0 in the ApiVersionResponse) will start off (after a short delay) by sending an empty PushTelemetryRequest, a GetTelemetrySubscriptionsRequest with the ClientInstanceId field set to Null , to one randomly selected connected broker to gather its client instance id, the desired push interval, subscribed push metrics, and what metrics formats the broker supports, this is effectively a metrics handshake that is performed each time a client selects a new broker to send metrics to.This initial handshake should be delayed by the client by 30 seconds after the client instance has been created to avoid short-lived one-off clients (such as Admin API tools) to negotiate and send metrics and trigger state creation for the client on the receiving the push interval, accepted compression types, etc. This handshake with a Null ClientInstanceId is only performed once for a client instance's lifetime. Sub-sequent GetTelemetrySubscriptionsRequests must include the ClientInstanceId returned in the first response, regardless of broker.

Upon receiving the PushTelemetryResponse the client must wait until the NextPushMs has expired before sending the next PushTelemetryRequest including the requested metrics. The NextPushMs may change in each PushTelemetryResponse and the client should react accordingly.GetTelemetrySubscriptionsResponse the client shall update its internal metrics collection to match received subscription (absolute update) and update its push interval timer according to the returned PushIntervalMs.

If GetTelemetrySubscriptionsResponseIf PushTelemetryResponse.RequestedMetrics indicates that no metrics are desired (RequestedMetrics is Null) the client must still send an empty PushTelemetryRequest by the next NextPushMs interval to get an updated set of desired or immediate metrics as well as a new NextPushMs timeshould send a new GetTelemetrySubscriptionsResponse according to the PushIntervalMs.

Client termination

When a client is being shut down it should send its final metrics regardless of the NextPushMs PushIntervalMs time, and only if the last PushTelemetryResponse.RequestedMetrics array length was greater than zeroclient has an active metrics subscription.

To avoid the receiving broker’s metrics rate-limiter to discard this out-of-profile push the PushTelemetryRequest.Terminating field must be set to true. A broker must only allow one such consecutive digression and otherwise throttle the client as if this field was not set.

...

Actions to be taken by the client if the GetTelemetrySubscriptionsResponse.Error or PushTelemetryResponse.ErrorCode is set to a non-zero value.


Error code

ReasonAction

Client action

..AuthorizationFailed

Client is not permitted to send metrics.

Raise a non-fatal error Log a warning to the application and schedule the next PushTelemetryRequest to GetTelemetrySubscriptionsRequest in 30 minutes.UnsupportedEncoding

Broker does not support the client’s metric encoding type.

Raise a non-fatal error to the application and schedule the next PushTelemetry to 30 minutes.

InvalidRecord

Broker failed to decode or validate the client’s encoded metrics.

Raise a non-fatal error Log a warning to the application and schedule the next PushTelemetry GetTelemetrySubscriptionsRequest to 5 minutes.

FunctionalityNotEnabled

Client sent a PushTelemetryRequest GetTelemetrySubscriptionsRequest to a broker which does not have a client metrics receiver plugin configured.

Raise a non-fatal error Log a warning to the application and schedule the next PushTelemetry to GetTelemetrySubscriptionsRequest in 30 minutes, preferably on another broker.

RateLimited

Client sent a non-Terminate push request before the NextPushMs time had expired.

The response will include only the NextPushMs which the client will use to adjust its next push time.

UnsupportedCompressionType

Client’s compression type is not supported by the broker.

The client should switch to another compression type. When all compression types are exhausted the application should fall back to uncompressed format.

...