Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As a GetTelemetrySubscriptionsRequest is received for a previously unknown client instance id, the CLIENT_METRICS config cache is scanned for any configured metric subscriptions whose match selectors match that of the client. The resulting matching configuration entries are compiled into a list of subscribed metrics which is returned in GetTelemetrySubscriptionsResponse.RequestedMetrics along with the minimum configured collection interval (this can be improved in future versions by including a per-metric interval so that each subscribed metric is collected with its configured interval, but in its current form longer-interval metrics are included “for free” if there are shorter-interval metrics in the subscription set). The CRC32 CRC32C checksum is also calculated based on the compiled metrics and is returned as the SubscriptionId in the response, as well as stored in the per-client-instance cache on the broker to track configuration changes.

...

The SubscriptionId is a unique identifier for a client instance's subscription set, the id is generated by calculating a CRC32 CRC32C of the configured metrics subscriptions matching the given client including the PushIntervalMs, XORed with the ClientInstanceId. This SubscriptionId is returned to the client in the GetTelemetrySubscriptionsResponse and the client passes that SubscriptionId in each subsequent PushTelemetryRequest for those received metrics. If the configured subscriptions are updated and resulting in a change for a client instance, the SubscriptionId is recalculated. Upon the next PushTelemetryRequest using the previous SubscriptionId, the broker will find that the received and expected SubscriptionIds differ and it will return UNKNOWN_SUBSCRIPTION_ID  back to the client. When a client receives this error code it will immediately send a GetTelemetrySubscriptionsRequest to retrieve the new subscription set along with a new SubscriptionId.

...

Code Block
/**
 * @return the client's assigned instance id used for metrics collection.
 * @throws InterruptException       If the thread is interrupted while blocked.
 * @throws TimeoutException         Indicates that a request timed out
 */
public Uuid clientInstanceId(Duration timeout); @throws KafkaException           If an unexpected error occurs while trying to determine the client
 *                                  instance ID, though this error does not necessarily imply the
 *                                  consumer client is otherwise unusable.
 * @throws IllegalArgumentException If the {@code timeout} is negative.
 * @throws IllegalStateException    If telemetry is not enabled ie, config `{@code enable.metrics.push}`
 *                                  is set to `{@code false}`.
 */
public Uuid clientInstanceId(Duration timeout);

If the client has not yet requested a client instance id from the broker, If the client has not yet requested a client instance id from the broker, this call may block up to the duration of the timeout. In the event that the client instance id cannot be obtained within the timeout, the method throws org.apache.kafka.common.errors.TimeoutException .

In addition, the following method is added to the KafkaStreams interface to give access to the client instance ids of the producers, consumers and admin clients used by Kafka Streams:

Code Block
/**
 * @return The theinternal clients' assigned instance ids used for metrics collection.
 *
 * @throws StreamsNotRunningException  Indicates that Kafka Streams IllegalArgumentException If {@code timeout} is negative.
 * @throws IllegalStateException If {@code KafkaStreams} is not running.
 * @throws TimeoutException  Indicates  that a request timed out.
 * @throws StreamsException For  Indicatesany thatother aerror requestthat timedmight outoccur.
 */
public ClientInstanceIds clientInstanceIds(Duration timeout);

This method is only permitted when Kafka Streams is in state RUNNING or REBALANCING. In the event that Kafka Streams is not in state RUNNING or REBALANCING, the method throws org.apache.kafka.streams.errors.StreamsNotRunningException , which is a new subclass of InvalidStateStoreException an IllegalStateException.

In the event that any of the client instance ids cannot be obtained within the timeout, the method throws org.apache.kafka.common.errors.TimeoutException .

The new class interface org.apache.kafka.streams.ClientInstanceIds is defined as follows:

Code Block
/**
 * Encapsulates the client instance ids used for metrics collection by
 * producers, consumers and admin clients used by Kafka Streams.
 */
public classinterface ClientInstanceIds {
  /**
   * Get the client instance id of the admin client
   *
   * @return the client instance id
   *
   * @throws IllegalStateException If telemetry is disabled on the admin client.
   */
  Uuid adminInstanceId();

  /**
   * Get the client instance ids of the consumers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, Uuid> consumerInstanceIds();

  /**
   * Get the client instance ids of the producers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, Uuid> producerInstanceIds();
}

Finally, a new method is added to org.apache.kafka.clients.CommonClientConfigs to return the ClientTelemetryReporter  if configured. This mirrors the similar metricsReporters()  methods in that class.

Code Block
languagejava
public static  */
  Map<StringOptional<ClientTelemetryReporter> telemetryReporter(String clientId, Uuid>AbstractConfig producerInstanceIds(config);
}


Broker configuration

ConfigurationDescriptionValues
telemetry.max.bytes The maximum size (after compression if compression is used) of telemetry pushed from a client to the broker.int, default: 1048576, valid values: [1,...]

...

version: broker's software version

Metric Name

Type

Group

Tags

Description

ClientMetricsInstanceCount

Gauge

ClientMetrics


Current number of client metric instances being managed by the broker. E.g., the number of unique CLIENT_INSTANCE_IDs with an empty or non-empty subscription set.

ClientMetricsUnknownSubscriptionRequestCount

ClientMetricsUnknownSubscriptionRequestRate

Meter

ClientMetrics



client version: client's software version

Total number/rate of metrics requests GetTelemetrySubscriptionsRequests for PushTelemetryRequests with unknown CLIENT_INSTANCE_IDssubscription id.

ClientMetricsThrottleCount

ClientMetricsThrottleRate

Meter

ClientMetrics

client_instance_id

Total number/rate of throttled PushTelemetryRequests telemetry requests due to a higher PushTelemetryRequest requests rate than the allowed PushIntervalMs.

ClientMetricsPluginExportCount

ClientMetricsPluginExportRate

Meter

ClientMetrics

client_instance_id

The total number/rate of metrics requests being pushed to metrics plugins, e.g., the number/rate of exportMetrics() calls.

ClientMetricsPluginErrorCount

ClientMetricsPluginErrorRate

Meter

ClientMetrics

client_instance_id
reason (reason for the failure)

The total number/rate of exceptions raised from plugin's exportMetrics().

ClientMetricsPluginExportTimeAvg

ClientMetricsPluginExportTime

ClientMetricsPluginExportTimeMax

HistogramAvg and MaxClientMetricsclient_instance_idAmount of time broker spends in invoking plugin exportMetrics call

...

Code Block
This tool helps to manipulate and describe client metrics configurations.
Option                                 Description                            
------                                 -----------                            
--alter                                Alter the configuration of the client
                                         metrics resource.
--block                                Block metrics collection.
--bootstrap-server <String: server to  REQUIRED: The Kafka server to connect to.  
  connect to>
--command-config <String: command      Property file containing configs to be 
  config property file>                  passed to Admin Client.                             
--delete                               Delete the configuration of the client
                                         metrics resource.
--describe                             List configurations for client metrics resources.   
--generate-name                        Generate a UUID to use as the name.
--help                                 Print usage information.               
--interval                             The metrics push interval in milliseconds.
--match                                Matching selector ‘k1=v1,k2=v2’. The following
                                         is a list of valid selector names:
                                         client_instance_id
                                         client_id
                                         client_software_name
                                         client_software_version
                                         client_source_address
                                         client_source_port
--metrics                              Telemetry metric name prefixes ‘m1,m2’.
--name <String>                        Name of client metrics configuration resource. 
--version                              Display Kafka version.                 

...

Code Block
languagebash
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --alter --name MYMETRIC \
  --metrics org.apache.kafka.consumer. \
  --interval 60000 \
  --match "client_software_name=kafka_python,client_software_version=1\.2\..*"

$ kafka-configs.sh --bootstrap-server $BROKERS --alter --entity-type client-metrics --entity-name MYMETRICS \
  --add-config "metrics=org.apache.kafka.consumer.,interval.ms=60000,match=[client_software_name=kafka.python,client_software_version=1\.2\..*]"

Block an existing client metrics configuration resource from pushing metrics

Blocking pushing metrics for a client metrics configuration resource is achieved by setting the push interval to 0ms.

Code Block
languagebash
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --alter --name MYMETRICS --block

$ kafka-configs.sh --bootstrap-server $BROKERS --alter --entity-type client-metrics --entity-name MYMETRICS \
  --add-config "interval.ms=0"

Delete a client metrics configuration resource

...