Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update on threading implementation

...

If GetTelemetrySubscriptionsResponse.RequestedMetrics is non-empty but does not match any metrics the client provides then the client should PushTelemetryRequests at the indicated PushIntervalMs interval with an empty metrics blob. This is needed so that a broker side metrics plugin can differentiate between non-responsive or buggy clients and clients that don't have metrics matching the subscription set.

Threading

The client will re-use the existing threads that are used for network communication. The existing logic in the poll method of the NetworkClient class issues internal network requests for metadata when needed. The NetworkClient class has been augmented to now also issue internal network requests for telemetry based on the poll interval in the subscription.

The threads that invoke the NetworkClient's poll method are:

  • KafkaAdminClient: the "admin client runnable" thread
    • KafkaAdminClient creates a dedicated thread to execute the AdminClientRunnable inner class
      • The AdminClientRunnable's processRequests method loops, polling for network requests via NetworkClient's poll method
  • KafkaConsumer: both the "heart beat" and application threads
    • KafkaConsumer creates a ConsumerNetworkClient (which wraps a NetworkClient) for network communication
    • KafkaConsumer also creates a ConsumerCoordinator to manage consumer group members
      • The ConsumerCoordinator is provided with a reference to the ConsumerNetworkClient instance from the KafkaConsumer
      • ConsumerCoordinator (via its AbstractCoordinator superclass) maintains an inner thread class named HeartbeatThread
        • The HeartbeatThread's run method loops and invokes the ConsumerNetworkClient's poll method
    • The KafkaConsumer's poll method, invoked by the caller on an application thread, also invokes the ConsumerNetworkClient's poll method
    • Thus when either the heartbeat thread runs or the application thread polls for new records, the internal NetworkClient's poll method is invoked
      • Synchronization is performed by the ConsumerNetworkClient to make sure two threads don't access the inner NetworkClient concurrently
  • KafkaProducer: the "sender" thread
    • The KafkaProducer creates a Sender to run in a dedicated thread to manage produce requests
      • The Sender is provided with a reference to the NetworkClient instance from the KafkaProducer
      • The Sender's run method loops, calling a method named runOnce in each pass
        • The runOnce method polls for network requests via NetworkClient's poll method

Connection selection

The client may send telemetry requests to any broker, but shall prefer using an already available connection rather than creating a new connection - to keep the number of cluster connections down.

It should also keep using the same broker connection for telemetry requests until the connection goes down, at which time it may choose to reconnect and continue using the same broker, or switch over to another broker connection. Using a persistent connection for PushTelemetryRequests is important so that metrics throttling can be properly performed by the receiving broker, and also avoids maintaining metrics state for the client instance id on multiple brokers.

Client termination

When a client is being shut down it should send its final metrics regardless of the PushIntervalMs time, and only if the client has an active metrics subscription.

...

The following new broker metrics should be added:

Metric Name

Type

Labels

Description

ClientMetricsInstanceCount

Gauge

broker_id

Current number of client metric instances being managed by this broker. E.g., the number of unique CLIENT_INSTANCE_IDs with an empty or non-empty subscription set.

ClientMetricsNewInstanceCount

Sum

broker_id

Total number of GetTelemetrySubscriptionsRequests received by this broker.

ClientMetricsUnknownInstanceCount

Sum

broker_id

Total number of metrics requests GetTelemetrySubscriptionsRequests with unknown CLIENT_INSTANCE_IDs.

ClientMetricsThrottleCount

Sum

broker_id

Total number of throttled PushTelemetryRequests due to a higher PushTelemetryRequest rate than the allowed PushIntervalMs.

ClientMetricsPluginExportCount

Sum

broker_id


The total number of metrics requests being pushed to metrics plugins, e.g., the number of exportMetrics() calls.

ClientMetricsPluginExportTimeMs

Sum

broker_id


The amount of time plugins spent handling pushed metrics, e.g., the amount of time spent in exportMetrics().

ClientMetricsPluginErrorCount

Sum

broker_id

The total number of exceptions raised from plugin's exportMetrics().


Client metrics and metric labels

...