...
Metric payloads are encoded as OpenTelemetry MetricsData v1 protobuf objects.
Naming
...
While clients must support both temporalities, the broker will initially only send GetTelemetrySubscriptionsResponse.DeltaTemporality=True. Configuration properties or extensions to the Metrics plugin interface on the broker to change the temporality is outside the scope of this KIP and may be addressed at a later time as the need arises.
See OTLP specification for more information on temporality.
...
Defining standard and required metrics makes the monitoring and troubleshooting of clients from various client types easier because the operator can combine the same metric from all client types. For example, request latencies for all client types are reported in a consistent way. Care has been taken to make these standard metrics as generic as possible, and they should fit most Kafka client implementations.
...
A client that supports this metric interface and identifies a supporting broker (through detecting at least GetTelemetrySubscriptionsRequestV0 in the ApiVersionResponse) will start off by sending a GetTelemetrySubscriptionsRequest with the ClientInstanceId field set to Null to one randomly selected connected broker to gather its client instance id, the subscribed metrics, the push interval, accepted compression types, etcand so on. This handshake with a Null ClientInstanceId is only performed once for a client instance's lifetime. Subsequent GetTelemetrySubscriptionsRequests must include the ClientInstanceId returned in the first response, regardless of broker.
...
KafkaAdminClient
: the "admin client runnable" threadKafkaAdminClient
creates a dedicated thread to execute theAdminClientRunnable
inner classTheAdminClientRunnable
'sprocessRequests
method loops, polling for network requests viaNetworkClient
'spoll
methodKafkaConsumer
: both the "heart beat" and application threadsKafkaConsumer
creates aConsumerNetworkClient
(which wraps aNetworkClient
) for network communicationKafkaConsumer
also creates aConsumerCoordinator
to manage consumer group members- The
ConsumerCoordinator
is provided with a reference to theConsumerNetworkClient
instance from theKafkaConsumer
ConsumerCoordinator
(via itsAbstractCoordinator
superclass) maintains an inner thread class namedHeartbeatThread
- The
HeartbeatThread
'srun
method loops and invokes theConsumerNetworkClient
'spoll
method
- The
- The
- The
KafkaConsumer
'spoll
method, invoked by the caller on an application thread, also invokes theConsumerNetworkClient
'spoll
method - Thus when either the heartbeat thread runs or the application thread polls for new records, the internal
NetworkClient
'spoll
method is invoked- Synchronization is performed by the
ConsumerNetworkClient
to make sure two threads don't access the innerNetworkClient
concurrently
- Synchronization is performed by the
KafkaProducer
: the "sender" thread- The
KafkaProducer
creates aSender
to run in a dedicated thread to manage produce requests- The
Sender
is provided with a reference to theNetworkClient
instance from theKafkaProducer
- The
Sender
'srun
method loops, calling a method namedrunOnce
in each pass- The
runOnce
method polls for network requests viaNetworkClient
'spoll
method
- The
- The
- The
Connection selection
The client may send telemetry requests to any broker, but shall prefer using an already available connection rather than creating a new connection to keep the number of cluster connections down.
- background" thread (based on the consumer threading refactor which is underway)
KafkaProducer
: the "sender" thread
Connection selection
The client may send telemetry requests to any broker, but shall prefer using an already available connection rather than creating a new connection to keep the number of cluster connections down.
It It should also keep using the same broker connection for telemetry requests until the connection goes down, at which time it may choose to reconnect and continue using the same broker, or switch over to another broker connection. Using a persistent connection for PushTelemetryRequests is important so that metrics throttling can be properly performed by the receiving broker, and also avoids maintaining metrics state for the client instance id on multiple brokers.
...
In the event that the client's metric subscription has changed and the final metrics push fails with error code UNKNOWN_SUBSCRIPTION_ID
, the terminating client can choose to obtain a new subscription ID by sending a GetTelemetrySubscriptionsRequest and then immediately sending a PushTelemetryRequest with the Terminating flag set to true, or it can choose to abandon sending a final metrics push.
The metrics should contain the reason for the client termination by including the client.terminating metric with the label “reason” set to a human readable explanation why the client is being shut down, such as “Fatal error: Static consumer fenced by newer instance”, or “Consumer closed by application”.
Error handling
push.
Error handling
The following error codes can be returned in PushTelemetryResponse.Actions to be taken by the client if the GetTelemetrySubscriptionsResponse.ErrorCode or PushTelemetryResponse.ErrorCode is set to a non-zero value.
Error code | Reason | Client action |
| Client sent a PushTelemetryRequest when it has already sent a Terminating request. | Log an error and stop pushing metrics. This is viewed as a problem in the client implementation of metrics serialization that is not likely to be resolved by retrying. |
| Broker failed to decode or validate the client’s encoded metrics. | Log an error and stop pushing metrics. This is viewed as a problem in the client implementation of metrics serialization that is not likely to be resolved by retrying. |
| Client sent a PushTelemetryRequest larger than the maximum size the broker will accept. | Reduce the size of the metrics payload so its size does not exceed GetTelemetrySubscriptionResponse.TelemetryMaxBytes. |
UNKNOWN_SUBSCRIPTION_ID (NEW) | Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId. The configured subscriptions have changed. | Immediately send a GetTelemetrySubscriptionRequest to update the client's subscriptions and get a new SubscriptionId. |
| Client’s compression type is not supported by the broker. | Immediately send a GetTelemetrySubscriptionRequest to get an up-to-date list of the broker's supported compression types (and any subscription changes). |
...
The broker should add additional metrics labels to help identify the client instance before forwarding the metrics to an external system. These are labels such as the namespace, cluster id, client principal, client source address and port, etc. As the metrics plugin may need to add additional metrics information on top of this, the generic metrics receiver in the broker will not add these labels but rely on the plugins to do so, this . This avoids deserializing and serializing the received metrics multiple times in the broker.
See Broker-added labels below for the list of labels that should be added by the plugin. the list of labels that should be added by the plugin.
The broker only reports support for GetTelemetrySubscriptions and PushTelemetry requests in its ApiVersionResponse if it has a MetricsReporter
that implements the ClientTelemetry
interface. This means that clients will not attempt to push metrics to brokers that are not capable of receiving them. If there is no client metrics receiver plugin configured on the broker, it will respond to GetTelemetrySubscriptionsRequest with RequestedMetrics set to Null and a -1 SubscriptionId. The client should send a new GetTelemetrySubscriptionsRequest after the PushIntervalMs has expired. This allows the metrics receiver to be enabled or disabled without having to restart the broker or reset the client connectionthe client will not send GetTelemetrySubscriptions or PushTelemetry RPCs, and existing behavior is preserved.
PushTelemetryRequest handling
...
Code Block |
---|
/**
* A {@link MetricsReporter} may implement this interface to indicate support for collecting client telemetry on the server side
*/
@InterfaceStability.Evolving
public interface ClientTelemetry {
/**
* Called by the broker to create a ClientTelemetryReceiver instance.
* This instance may be cached by the broker.
*
* This method will always be called after the initial call to
* {@link MetricsReporter#contextChange(MetricsContext)}
* on the MetricsReporter implementing this interface.
*
* @return
*/
ClientTelemetryReceiver clientReceiver();
}
@InterfaceStability.Evolving
public interface ClientTelemetryReceiver {
/**
* Called by the broker when a client reports telemetry. The associated request context can be used
* by the plugin to retrieve additional client information such as client ids or endpoints.
*
* This method may be called from the request handling thread, and as such should avoid blocking.
*
* @param context the client request context for the corresponding PushTelemetryRequest api call
* @param payload the encoded telemetry payload as sent by the client
*/
void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload);
}
@InterfaceStability.Evolving
public interface ClientTelemetryPayload {
/**
* Client's instance id.
*/
Uuid clientInstanceId();
/**
* Indicates whether the client is terminating and sending its last metrics push.
*/
boolean isTerminating();
/**
* Metrics data content-type / serialization format.
* Currently "application/x-protobuf;type=otlp+metrics0.19"
*/
String contentType();
/**
* Serialized uncompressed metrics data.
*/
ByteBuffer data();
} |
...
If the client has not yet requested a client instance id from the broker, this call may block up to the duration of the timeout. If no client instance id can be retrieved within the timeout, an error is returned (such as timeout, feature not supported by broker, auth failure).
Broker configuration
Configuration | Description | Values |
---|---|---|
telemetry.max.bytes | The maximum size (after compression if compression is used) of telemetry pushed from a client to the broker. | int, default: 1048576, valid values: [1,...] |
Client configuration
This applies to producers, consumers, admin client, and of course embedded uses of these clients in frameworks such as Kafka Connect.
...
There will be no permission checks on the PushTelemetryRequest itself.
API Request | Resource | ACL operation |
---|---|---|
DescribeConfigs | CLUSTER | DESCRIBE, READ_CONFIGS |
AlterConfigs | CLUSTER | WRITEALTER |
GetTelemetrySubscriptions | N/A | N/A |
PushTelemetry | N/A | N/A |
...