...
Code Block |
---|
PushTelemetryRequest { ClientInstanceId string // client.id + “-” + UUID4 unique for this client instance. // Must be set to Null on the first request, and to the // returned ClientInstanceId from the first response // for all subsequent requests to any broker. Terminating bool // Client is terminating. ContentType string // “application/x-protobuf;type=otlp+metrics07”metrics08” or “application/x-protobuf+zstd;type=otlp+metrics07”metrics08” Metrics binary // Format specified by ContentType, possibly compressed. } PushTelemetryResponse { ThrottleTime int32 // Standard and metric-specific throttling ErrorCode int16 // Error code ClientInstanceId string // Will be set to a generated unique id if the // request ClientInstanceId was Null, else // this field will be set to Null. AcceptedContentTypes Array[string] // Accepted metric formats. // Only returned if Request.ContentType is null // or empty, or ErrorCode is set to // UnsupportedFormat(?). // Also includes the supported compression types. NextPushMs int32 // Delay until next PushTelemetry RequestedMetrics Array[string] // Requested Metrics prefix string match. // Empty array: No metrics subscribed. // Array[0] empty string: All metrics subscribed. // Array[..]: prefix string match } |
...
The ContentType in the client's PushTelemetryRequest must include a type parameter describing the data format, in this case otlp+metrics07 for metrics08 for OpenTelemetry protocol v0.78. This is needed since OTLP is not yet at v1 and may thus have backwards-incompatible changes.
...
Code Block |
---|
/** * A {@link MetricsReporter} may implement this interface to indicate support for collecting client telemetry on the server side */ @InterfaceStability.Evolving public interface ClientTelemetry { /** * Called by the broker to create a ClientTelemetryReceiver instance. * This instance may be cached by the broker. * * This method will always be called after the initial call to * {@link MetricsReporter#contextChange(MetricsContext)} * on the MetricsReporter implementing this interface. * * @return */ ClientTelemetryReceiver clientReceiver(); } @InterfaceStability.Evolving public interface ClientTelemetryReceiver { /** * Called by the broker when a client telemetry reports telemetry. The associated request context can be used * by the plugin to retrieve additional client information such as client ids or endpoints. * * This method may be called from the request handling thread, and as such should avoid blocking * * @param context the client request context for the corresponding PushTelemetryRequest api call * @param payload the encoded telemetry payload as sent by the client */ void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload); /** * Returns the list of accepted content types for client metric payloads supported by this receiver * This method may be used by the broker to inform connecting clients of supported payload formats * * <p>e.g. * <code>application/x-protobuf;type=otlp+metrics07<metrics08</code>, * </p> * * @return collection of supported mime types */ Collection<String> acceptedContentTypes(); } @InterfaceStability.Evolving public interface ClientTelemetryPayload { String clientInstanceId(); String contentType(); ByteBuffer data(); } |
...
The PushTelemtryRequest.ContentType must then be set to application/x-protobuf+zstd;type=otlp+metrics07metrics08 (or any of the other Kafka-supported compression codecs: gzip, snappy, lz4, zstd).
...