...
If GetTelemetrySubscriptionsResponse.RequestedMetrics is non-empty but does not match any metrics the client provides then the client should PushTelemetryRequests at the indicated PushIntervalMs interval with an empty metrics blob. This is needed so that a broker side metrics plugin can differentiate between non-responsive or buggy clients and clients that don't have metrics matching the subscription set.
Threading
The client will re-use the existing threads that are used for network communication. The existing logic in the poll
method of the NetworkClient
class issues internal network requests for metadata when needed. The NetworkClient
class has been augmented to now also issue internal network requests for telemetry based on the poll interval in the subscription.
The threads that invoke the NetworkClient
's poll
method are:
KafkaAdminClient
: the "admin client runnable" threadKafkaAdminClient
creates a dedicated thread to execute theAdminClientRunnable
inner class- The
AdminClientRunnable
'sprocessRequests
method loops, polling for network requests viaNetworkClient
'spoll
method
- The
KafkaConsumer
: both the "heart beat" and application threadsKafkaConsumer
creates aConsumerNetworkClient
(which wraps aNetworkClient
) for network communicationKafkaConsumer
also creates aConsumerCoordinator
to manage consumer group members- The
ConsumerCoordinator
is provided with a reference to theConsumerNetworkClient
instance from theKafkaConsumer
ConsumerCoordinator
(via itsAbstractCoordinator
superclass) maintains an inner thread class namedHeartbeatThread
- The
HeartbeatThread
'srun
method loops and invokes theConsumerNetworkClient
'spoll
method
- The
- The
- The
KafkaConsumer
'spoll
method, invoked by the caller on an application thread, also invokes theConsumerNetworkClient
'spoll
method - Thus when either the heartbeat thread runs or the application thread polls for new records, the internal
NetworkClient
'spoll
method is invoked- Synchronization is performed by the
ConsumerNetworkClient
to make sure two threads don't access the innerNetworkClient
concurrently
- Synchronization is performed by the
KafkaProducer
: the "sender" thread- The
KafkaProducer
creates aSender
to run in a dedicated thread to manage produce requests- The
Sender
is provided with a reference to theNetworkClient
instance from theKafkaProducer
- The
Sender
'srun
method loops, calling a method namedrunOnce
in each pass- The
runOnce
method polls for network requests viaNetworkClient
'spoll
method
- The
- The
- The
Connection selection
The client may send telemetry requests to any broker, but shall prefer using an already available connection rather than creating a new connection - to keep the number of cluster connections down.
It should also keep using the same broker connection for telemetry requests until the connection goes down, at which time it may choose to reconnect and continue using the same broker, or switch over to another broker connection. Using a persistent connection for PushTelemetryRequests is important so that metrics throttling can be properly performed by the receiving broker, and also avoids maintaining metrics state for the client instance id on multiple brokers.
Client termination
When a client is being shut down it should send its final metrics regardless of the PushIntervalMs time, and only if the client has an active metrics subscription.
...
The following new broker metrics should be added:
Metric Name | Type | Labels | Description |
---|---|---|---|
ClientMetricsInstanceCount | Gauge | broker_id | Current number of client metric instances being managed by this broker. E.g., the number of unique CLIENT_INSTANCE_IDs with an empty or non-empty subscription set. |
ClientMetricsNewInstanceCount | Sum | broker_id | Total number of GetTelemetrySubscriptionsRequests received by this broker. |
ClientMetricsUnknownInstanceCount | Sum | broker_id | Total number of metrics requests GetTelemetrySubscriptionsRequests with unknown CLIENT_INSTANCE_IDs. |
ClientMetricsThrottleCount | Sum | broker_id | Total number of throttled PushTelemetryRequests due to a higher PushTelemetryRequest rate than the allowed PushIntervalMs. |
ClientMetricsPluginExportCount | Sum | broker_id | The total number of metrics requests being pushed to metrics plugins, e.g., the number of exportMetrics() calls. |
ClientMetricsPluginExportTimeMs | Sum | broker_id | The amount of time plugins spent handling pushed metrics, e.g., the amount of time spent in exportMetrics(). |
ClientMetricsPluginErrorCount | Sum | broker_id | The total number of exceptions raised from plugin's exportMetrics(). |
Client metrics and metric labels
...