Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder discussionApproved

Discussion thread: here and now here

JIRA: KAFKA-15601here TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Error code

Reason

Client action

INVALID_REQUEST  (42)

Client sent a PushTelemetryRequest when it has already sent a Terminating request.

Log an error and stop pushing metrics. This is viewed as a problem in the client implementation of metrics serialization that is not likely to be resolved by retrying.

INVALID_RECORD  (87)

Broker failed to decode or validate the client’s encoded metrics.

Log an error and stop pushing metrics. This is viewed as a problem in the client implementation of metrics serialization that is not likely to be resolved by retrying.

TELEMETRY_TOO_LARGE  (NEW)

Client sent a PushTelemetryRequest larger than the maximum size the broker will accept.

Reduce the size of the metrics payload so its size does not exceed GetTelemetrySubscriptionResponseGetTelemetrySubscriptionsResponse.TelemetryMaxBytes.

UNKNOWN_SUBSCRIPTION_ID (NEW)Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId. The configured subscriptions have changed.Immediately send a GetTelemetrySubscriptionRequest GetTelemetrySubscriptionsRequest to update the client's subscriptions and get a new SubscriptionId.

UNSUPPORTED_COMPRESSION_TYPE  (76)

Client’s compression type is not supported by the broker.

Immediately send a GetTelemetrySubscriptionRequest GetTelemetrySubscriptionsRequest to get an up-to-date list of the broker's supported compression types (and any subscription changes).

...

As a GetTelemetrySubscriptionsRequest is received for a previously unknown client instance id, the CLIENT_METRICS config cache is scanned for any configured metric subscriptions whose match selectors match that of the client. The resulting matching configuration entries are compiled into a list of subscribed metrics which is returned in GetTelemetrySubscriptionsResponse.RequestedMetrics along with the minimum configured collection interval (this can be improved in future versions by including a per-metric interval so that each subscribed metric is collected with its configured interval, but in its current form longer-interval metrics are included “for free” if there are shorter-interval metrics in the subscription set). The CRC32 CRC32C checksum is also calculated based on the compiled metrics and is returned as the SubscriptionId in the response, as well as stored in the per-client-instance cache on the broker to track configuration changes.

...

Code Block
GetTelemetrySubscriptionsRequestV0 {
 	ClientInstanceId uuid                // UUID4 unique for this client instance.
										 // Must be set to Null on the first request, and to the returned ClientInstanceId
                                         // from the first response for all subsequent requests to any broker.
}

GetTelemetrySubscriptionsResponseV0 {
 	ThrottleTimeThrottleTimeMs int32					 // Standard throttling.
	ErrorCode int16						             // ErrorThe code.
duration in milliseconds for ClientInstanceIdwhich uuidthe request was throttled due to a quota  violation,
       // Assigned client instance id if ClientInstanceId was Null in the request, else Null.
    SubscriptionId int32                 // Uniqueor identifierzero forif the currentrequest subscriptiondid setnot forviolate thisany client instancequota.
	ErrorCode int16						 // The AcceptedCompressionTypes Array[int8] // The compression types the broker accepts for PushTelemetryRequest.CompressionType
    error code, or 0 if there was no error.
    ClientInstanceId uuid                // Assigned client instance id if ClientInstanceId was Null in the request, else Null.
    SubscriptionId  int32   // as listed in MessageHeaderV2.Attributes.CompressionType. The array will be sorted in
     // Unique identifier for the current subscription set for this client instance.
    AcceptedCompressionTypes Array[int8] // The compression types the broker accepts for PushTelemetryRequest.CompressionType
           // preference order from higher to lower. The CompressionType of NONE will not be
                 // as listed in MessageHeaderV2.Attributes.CompressionType. The array will be sorted in
                                         // preference order from higher to lower. The CompressionType of NONE will not be
                                         // present in the response from the broker, though the broker does support uncompressed
                                         // client telemetry if none of the accepted compression codecs are supported by the client.
    PushIntervalMs int32                 // Configured push interval, which is the lowest configured interval in the current subscription set.
    TelemetryMaxBytes int32              // The maximum bytes of binary data the broker accepts in PushTelemetryRequest.
    DeltaTemporality bool                // If True; monotonic/counter metrics are to be emitted as deltas to the previous sample.
                                         // If False; monotonic/counter metrics are to be emitted as cumulative absolute values.
	RequestedMetrics Array[string]		 // Requested telemetry metrics prefix string match.
										 // Empty array: No metrics subscribed.
										 // Array[0] empty string"*": All metrics subscribed.
										 // Array[..]: prefix string match.
}

PushTelemetryRequestV0 {
	ClientInstanceId uuid                // UUID4 unique for this client instance, as retrieved in the first GetTelemetrySubscriptionsRequest.
    SubscriptionId int32                 // SubscriptionId from the GetTelemetrySubscriptionsResponse for the collected metrics.
	Terminating bool                     // Client is terminating.
    CompressionType int8                 // Compression codec used for .Metrics (ZSTD, LZ4, Snappy, GZIP, None).
                                         // Same values as that of the current MessageHeaderV2.Attributes.
	Metrics binary                       // Metrics encoded in OpenTelemetry MetricsData v1 protobuf format.
}

PushTelemetryResponseV0 {
  	ThrottleTimeThrottleTimeMs int32	             // The duration in milliseconds for which the request was throttled due to a quota violation,
                                         // Standard throttling.
	 or zero if the request did not violate any quota.
    ErrorCode int16                      // The Errorerror code, or 0 if there was no error.
}

Metrics serialization format

...

The SubscriptionId is a unique identifier for a client instance's subscription set, the id is generated by calculating a CRC32 CRC32C of the configured metrics subscriptions matching the given client including the PushIntervalMs, XORed with the ClientInstanceId. This SubscriptionId is returned to the client in the GetTelemetrySubscriptionsResponse and the client passes that SubscriptionId in each subsequent PushTelemetryRequest for those received metrics. If the configured subscriptions are updated and resulting in a change for a client instance, the SubscriptionId is recalculated. Upon the next PushTelemetryRequest using the previous SubscriptionId, the broker will find that the received and expected SubscriptionIds differ and it will return UNKNOWN_SUBSCRIPTION_ID  back to the client. When a client receives this error code it will immediately send a GetTelemetrySubscriptionsRequest to retrieve the new subscription set along with a new SubscriptionId.

...

Retrieve broker-generated client instance id, may be used by application to assist in mapping the client instance id to the application instance through log messages or other means.

The following method is added client instance ids returned correspond to the client_instance_id labels added by the broker to the metrics pushed from the clients. This should be sufficient information to enable correlation between the metrics available in the client, and the metrics pushed to the broker.

The following method is added to the Producer, Consumer, and Admin client interfaces Producer, Consumer, and Admin client interfaces:

Code Block
/**
 * @return the client's assigned instance id used for metrics collection.
 */
public String clientInstanceId(Duration timeout);

If the client has not yet requested a client instance id from the broker, this call may block up to the duration of the timeout. If no client instance id can be retrieved within the timeout, an error is returned (such as timeout, feature not supported by broker, auth failure).

In addition, the following method is added to the KafkaStreams interface to give access to the client instance ids of the producers, consumers and admin clients used by Kafka Streams:

Code Block
/**
 * @return the clients' assigned instance ids used for metrics collection.
 */
public ClientsInstanceIds clientsInstanceId(Duration timeout);

The new class org.apache.kafka.streams.ClientsInstanceIds is defined as follows:

Code Block
/**
 * Encapsulates the client instance ids used for metrics collection by
 * producers, consumers and admin clients used by Kafka Streams.
 */
public class ClientsInstanceIds {
  /**
   * Get the client instance id of the admin client
   *
   * @return the client instance id 
   */
  String adminInstanceId();

   /**
   * Get the client instance id of the global consumer
   *
   * @return the client instance id, if any
   */
  Optional<String> globalConsumerInstanceId();

  /**
   * Get the client instance ids of the main consumers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, String> mainConsumerInstanceIds();

  /**
   * Get the client instance ids of the restore consumers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, String> restoreConsumerInstanceIds();

  /**
   * Get the client instance id of the producers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, String> producerInstanceIds();
}

Broker configuration

...

Client configuration

This applies to producers, consumers, admin client, and of course embedded uses of these clients in frameworks such as Kafka Connect.

...

true (default) - The client will push metrics if there are any matching subscriptions.

false  - The client will not push metrics. 

Following the usual convention, the string constant ENABLE_METRICS_PUSH_CONFIG ("enable.metrics.push") will be added to CommonClientConfigs, ProducerConfig, ConsumerConfig, AdminClientConfig and StreamsConfig.

Client metrics configuration

These are the configurations for client metrics resources. A client metrics subscription is defined by the configurations for a resource of type CLIENT_METRICS .

...

An empty list means no metrics subscribed.

A list containing just an empty string means all metrics subscribed.

Otherwise, the list entries are prefix-matched against the metric names.

...

A list of key-value pairs.

The valid keys are:

  • client_instance_id - CLIENT_INSTANCE_ID UUID string representation.
  • client_id  - client's reported client.id in the GetTelemetrySubscriptionsRequest.
  • client_software_name  - client software implementation name.
  • client_software_version  - client software implementation version.
  • client_source_address  - client connection's source address from the broker's point of view.
  • client_source_port  - client connection's source port from the broker's point of view.

The values are anchored regular expressions.

New error codes

TELEMETRY_TOO_LARGE  - Client sent a PushTelemetryRequest with a payload that was too large.

UNKNOWN_SUBSCRIPTION_ID  - Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId. The configured subscriptions have changed.

Metrics

The following new broker metrics should be added:

...

Metric Name

...

Type

...

Tags

...

Description

...

ClientMetricsInstanceCount

...

Gauge

...

version: broker's software version

...

Current number of client metric instances being managed by the broker. E.g., the number of unique CLIENT_INSTANCE_IDs with an empty or non-empty subscription set.

...

ClientMetricsSubscriptionRequestCount

...

Meter

...

version: broker's software version

...

Total number of GetTelemetrySubscriptionsRequests received by this broker.

...

ClientMetricsUnknownSubscriptionRequestCount

...

Meter

...

client version: client's software version

...

Total number of metrics requests GetTelemetrySubscriptionsRequests with unknown CLIENT_INSTANCE_IDs.

...

ClientMetricsThrottleCount

...

Meter

...

client_instance_id

...

Total number of throttled PushTelemetryRequests due to a higher PushTelemetryRequest rate than the allowed PushIntervalMs.

...

ClientMetricsPluginExportCount

...

Meter

...

client_instance_id

...

The total number of metrics requests being pushed to metrics plugins, e.g., the number of exportMetrics() calls.

...

ClientMetricsPluginErrorCount

...

Meter

...

client_instance_id
reason (reason for the failure)

...

The total number of exceptions raised from plugin's exportMetrics().

...

 @throws InterruptException       If the thread is interrupted while blocked.
 * @throws TimeoutException         Indicates that a request timed out
 * @throws KafkaException           If an unexpected error occurs while trying to determine the client
 *                                  instance ID, though this error does not necessarily imply the
 *                                  consumer client is otherwise unusable.
 * @throws IllegalArgumentException If the {@code timeout} is negative.
 * @throws IllegalStateException    If telemetry is not enabled ie, config `{@code enable.metrics.push}`
 *                                  is set to `{@code false}`.
 */
public Uuid clientInstanceId(Duration timeout);

If the client has not yet requested a client instance id from the broker, this call may block up to the duration of the timeout. In the event that the client instance id cannot be obtained within the timeout, the method throws org.apache.kafka.common.errors.TimeoutException .

In addition, the following method is added to the KafkaStreams interface to give access to the client instance ids of the producers, consumers and admin clients used by Kafka Streams:

Code Block
/**
 * @return The internal clients' assigned instance ids used for metrics collection.
 *
 * @throws IllegalArgumentException If {@code timeout} is negative.
 * @throws IllegalStateException If {@code KafkaStreams} is not running.
 * @throws TimeoutException Indicates that a request timed out.
 * @throws StreamsException For any other error that might occur.
 */
public ClientInstanceIds clientInstanceIds(Duration timeout);

This method is only permitted when Kafka Streams is in state RUNNING or REBALANCING. In the event that Kafka Streams is not in state RUNNING or REBALANCING, the method throws an IllegalStateException.

In the event that any of the client instance ids cannot be obtained within the timeout, the method throws org.apache.kafka.common.errors.TimeoutException .

The new interface org.apache.kafka.streams.ClientInstanceIds is defined as follows:

Code Block
/**
 * Encapsulates the client instance ids used for metrics collection by
 * producers, consumers and admin clients used by Kafka Streams.
 */
public interface ClientInstanceIds {
  /**
   * Get the client instance id of the admin client
   *
   * @return the client instance id
   *
   * @throws IllegalStateException If telemetry is disabled on the admin client.
   */
  Uuid adminInstanceId();

  /**
   * Get the client instance ids of the consumers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, Uuid> consumerInstanceIds();

  /**
   * Get the client instance ids of the producers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, Uuid> producerInstanceIds();
}

Finally, a new method is added to org.apache.kafka.clients.CommonClientConfigs to return the ClientTelemetryReporter  if configured. This mirrors the similar metricsReporters()  methods in that class.

Code Block
languagejava
public static Optional<ClientTelemetryReporter> telemetryReporter(String clientId, AbstractConfig config);


Broker configuration

ConfigurationDescriptionValues
telemetry.max.bytes The maximum size (after compression if compression is used) of telemetry pushed from a client to the broker.int, default: 1048576, valid values: [1,...]

Client configuration

This applies to producers, consumers, admin client, and of course embedded uses of these clients in frameworks such as Kafka Connect.

ConfigurationDescriptionValues
enable.metrics.push Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client.

true (default) - The client will push metrics if there are any matching subscriptions.

false  - The client will not push metrics. 

Following the usual convention, the string constant ENABLE_METRICS_PUSH_CONFIG ("enable.metrics.push") will be added to CommonClientConfigs, ProducerConfig, ConsumerConfig, AdminClientConfig and StreamsConfig.

Client metrics configuration

These are the configurations for client metrics resources. A client metrics subscription is defined by the configurations for a resource of type CLIENT_METRICS .

ConfigurationDescriptionValues
metrics A list of telemetry metric name prefixes which specify the metrics of interest.

An empty list means no metrics subscribed.

A list containing just "*" means all metrics subscribed.

Otherwise, the list entries are prefix-matched against the metric names.

interval.ms The client metrics push interval in milliseconds.Default: 300000 (5 minutes), minimum: 100, maximum: 3600000 (1 hour)
match The match criteria for selecting which clients the subscription matches. If a client matches all of these criteria, the client matches the subscription.

A list of key-value pairs.

The valid keys are:

  • client_instance_id - CLIENT_INSTANCE_ID UUID string representation.
  • client_id  - client's reported client.id in the GetTelemetrySubscriptionsRequest.
  • client_software_name  - client software implementation name.
  • client_software_version  - client software implementation version.
  • client_source_address  - client connection's source address from the broker's point of view.
  • client_source_port  - client connection's source port from the broker's point of view.

The values are anchored regular expressions.

New error codes

TELEMETRY_TOO_LARGE  - Client sent a PushTelemetryRequest with a payload that was too large.

UNKNOWN_SUBSCRIPTION_ID  - Client sent a PushTelemetryRequest with an invalid or outdated SubscriptionId. The configured subscriptions have changed.

Metrics

The following new broker metrics should be added:

Metric Name

Type

Group

Tags

Description

ClientMetricsInstanceCount

Gauge

ClientMetrics


Current number of client metric instances being managed by the broker. E.g., the number of unique CLIENT_INSTANCE_IDs with an empty or non-empty subscription set.

ClientMetricsUnknownSubscriptionRequestCount

ClientMetricsUnknownSubscriptionRequestRate

Meter

ClientMetrics



Total number/rate of metrics requests for PushTelemetryRequests with unknown subscription id.

ClientMetricsThrottleCount

ClientMetricsThrottleRate

Meter

ClientMetrics

client_instance_id

Total number/rate of throttled telemetry requests due to a higher requests rate than the allowed PushIntervalMs.

ClientMetricsPluginExportCount

ClientMetricsPluginExportRate

Meter

ClientMetrics

client_instance_id

The total number/rate of metrics requests being pushed to metrics plugins, e.g., the number/rate of exportMetrics() calls.

ClientMetricsPluginErrorCount

ClientMetricsPluginErrorRate

Meter

ClientMetrics

client_instance_id

The total number/rate of exceptions raised from plugin's exportMetrics().

ClientMetricsPluginExportTimeAvg

ClientMetricsPluginExportTimeMax

Avg and MaxClientMetricsclient_instance_idAmount of time broker spends in invoking plugin exportMetrics call

Security

Since client metric subscriptions are primarily aimed at the infrastructure operator that is managing the Kafka cluster it should be sufficient to limit the config control operations to the CLUSTER resource. 

There will be no permission checks on the PushTelemetryRequest itself.

API Request

Resource

ACL operation

DescribeConfigs

CLUSTER

DESCRIBE_CONFIGS

AlterConfigs

CLUSTER

ALTER_CONFIGS

GetTelemetrySubscriptionsN/AN/A

PushTelemetry

N/A

N/A

Tools

kafka-configs.sh

The kafka-configs.sh  tool can be used to administer client metrics configuration. The kafka-client-metrics.sh  tool is preferred because it is more useable.

A new entity-type  of client-metrics  is added.

For this entity type, the allowed configs are: interval.ms , metrics  and match .

Some examples of use of kafka-configs.sh  are shown in the next section for comparison with kafka-client-metrics.sh .

kafka-client-metrics.sh

A new kafka-client-metrics.sh  tool is added which provides a simpler interface for administering client metrics configuration resources than using kafka-configs.sh  directly.

Here's the command-line syntax summary.

Code Block
This tool helps to manipulate and describe client metrics configurations.
Option

Security

Since client metric subscriptions are primarily aimed at the infrastructure operator that is managing the Kafka cluster it should be sufficient to limit the config control operations to the CLUSTER resource. 

There will be no permission checks on the PushTelemetryRequest itself.

...

API Request

...

Resource

...

ACL operation

...

DescribeConfigs

...

CLUSTER

...

DESCRIBE_CONFIGS

...

AlterConfigs

...

CLUSTER

...

ALTER

...

PushTelemetry

...

N/A

...

N/A

Tools

kafka-configs.sh

The kafka-configs.sh  tool can be used to administer client metrics subscriptions. The kafka-client-metrics.sh  tool is preferred because it is more useable.

A new entity-type  of client-metrics  is added.

For this entity type, the allowed configs are: interval.ms , metrics  and match .

Some examples of use of kafka-configs.sh  are shown in the next section for comparison with kafka-client-metrics.sh .

kafka-client-metrics.sh

A new kafka-client-metrics.sh  tool is added which provides a simpler interface for administering client metrics subscriptions than using kafka-configs.sh  directly.

Here's the command-line syntax summary.

Code Block
This tool helps to manipulate and describe client metrics subscriptions
Option                                 Description                            
------                                 -----------                            
--alter                                Alter the configuration of the client
                                         metrics subscription.
--block                                Block metrics collection.
--bootstrap-server <String: server to  The Kafka server to connect to. This   
  connect to>Description                                
------              is required for describing and       
                   -----------                      altering broker configs.             
--command-configalter <String: command      Property file containing configs to be 
  config property file>                  passed to AdminAlter Client.the Thisconfiguration isof usedthe client
                                         only with metrics resource.
--bootstrap-server option<String: server 
to  REQUIRED: The Kafka server to connect to.  
  connect to>
--command-config <String: command      Property file containing configs to be 
  config property file>          for describing and altering broker   
  passed to Admin Client.                             
--delete       configs.                        Delete the configuration of the client
--delete                               Delete the configuration of the client
     metrics resource.
--describe                             List configurations for client metrics resources.  metrics subscription.
--describegenerate-name                        Generate a UUID to use as the name.
--help        List configs for the client metrics subscriptions.   
--generate-name                Print usage information.      Generate a UUID to use as the name.  
--interval                             The metrics push interval in milliseconds.
--match                                Matching selector ‘k1=v1,k2=v2’. The following
                                         is a list of valid selector names:
                                         client_instance_id
                                         client_id
                                         client_software_name
                                         client_software_version
                                         client_source_address
                                         client_source_port
--metrics                              Telemetry metric name prefixes ‘m1,m2’.
--name <String>                        Name of client metrics subscription. 
--help                                 Print usage information.               configuration resource. 
--version                              Display Kafka version.                 

Here are some examples.

List all client metrics subscriptions

Code Block
languagebash
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --describe

$ kafka-configs.sh --bootstrap-server $BROKERS --describe --entity-type client-metrics

Describe a client metrics subscription

Code Block
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --describe --name METRICSUB

$ kafka-configs.sh --bootstrap-server $BROKERS --describe --entity-type client-metrics --entity-name METRICSUB

Create a client metrics subscription, generating a unique name

In this example, --generate-name causes the tool to create a type-4 UUID to use as the client metrics subscription name. There is no equivalent in kafka-configs.sh .

 Display Kafka version.                 

Here are some examples.

List all client metrics configuration resources

Code Block
languagebash
Code Block
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --alter-describe

$ kafka-configs.sh --generatebootstrap-nameserver \
 $BROKERS --metrics org.apache.kafka.producer.node.request.latency.,org.apache.kafka.consumer.node.request.latency. \
  --interval 60000

Create a client metrics subscription for all Python v1.2.* clients

describe --entity-type client-metrics

Describe a client metrics configuration resources

code
Code Block
languagebash
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --alterdescribe --name METRICSUB \MYMETRICS

$ kafka-configs.sh --bootstrap-metrics org.apache.kafka.consumer. \
  --interval 60000 \
  --match "client_software_name=kafka_python,client_software_version=1\.2\..*"

$ kafka-configsserver $BROKERS --describe --entity-type client-metrics --entity-name MYMETRICS

Create a client metrics configuration resource, generating a unique name

In this example, --generate-name causes the tool to create a type-4 UUID to use as the client metrics configuration resource name. There is no equivalent in kafka-configs.sh .

Code Block
languagebash
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --alter --entity-type client-metrics --entitygenerate-name METRICSUB \
  --add-config "metrics=metrics org.apache.kafka.consumer.producer.node.request.latency.,interval.ms=60000,match=[client_software_name=kafka.python,client_software_version=1\.2\..*]"

Block an existing client metrics subscription from pushing metrics

Blocking pushing metrics for a client metrics subscription is achieved by setting the push interval to 0ms.

org.apache.kafka.consumer.node.request.latency. \
  --interval 60000

Create a client metrics configuration resource for all Python v1.2.* clients

code
Code Block
languagebash
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --alter --name METRICSUB --block MYMETRIC \
  --metrics org.apache.kafka.consumer. \
  --interval 60000 \
  --match "client_software_name=kafka_python,client_software_version=1\.2\..*"

$ kafka-configs.sh --bootstrap-server $BROKERS --alter --entity-type client-metrics --entity-name METRICSUBMYMETRICS \
  --add-config "metrics=org.apache.kafka.consumer.,interval.ms=0=60000,match=[client_software_name=kafka.python,client_software_version=1\.2\..*]"

Delete a client metrics

...

configuration resource

Deletion of a client metrics subscription configuration resource with kafka-configs.sh  requires listing the names of the configs to delete.

Code Block
languagebash
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --delete --name METRICSUBMYMETRICS

$ kafka-configs.sh --bootstrap-server $BROKERS --alter --entity-type client-metrics --entity-name METRICSUBMYMETRICS \
  --delete-config metrics,interval.ms,match

...