...
As a GetTelemetrySubscriptionsRequest is received for a previously unknown client instance id, the CLIENT_METRICS config cache is scanned for any configured metric subscriptions whose match selectors match that of the client. The resulting matching configuration entries are compiled into a list of subscribed metrics which is returned in GetTelemetrySubscriptionsResponse.RequestedMetrics along with the minimum configured collection interval (this can be improved in future versions by including a per-metric interval so that each subscribed metric is collected with its configured interval, but in its current form longer-interval metrics are included “for free” if there are shorter-interval metrics in the subscription set). The CRC32 CRC32C checksum is also calculated based on the compiled metrics and is returned as the SubscriptionId in the response, as well as stored in the per-client-instance cache on the broker to track configuration changes.
...
The SubscriptionId is a unique identifier for a client instance's subscription set, the id is generated by calculating a CRC32 CRC32C of the configured metrics subscriptions matching the given client including the PushIntervalMs, XORed with the ClientInstanceId. This SubscriptionId is returned to the client in the GetTelemetrySubscriptionsResponse and the client passes that SubscriptionId in each subsequent PushTelemetryRequest for those received metrics. If the configured subscriptions are updated and resulting in a change for a client instance, the SubscriptionId is recalculated. Upon the next PushTelemetryRequest using the previous SubscriptionId, the broker will find that the received and expected SubscriptionIds differ and it will return UNKNOWN_SUBSCRIPTION_ID
back to the client. When a client receives this error code it will immediately send a GetTelemetrySubscriptionsRequest to retrieve the new subscription set along with a new SubscriptionId.
...
Code Block |
---|
/** * @return the client's assigned instance id used for metrics collection. * @throws InterruptException If the thread is interrupted while blocked. * @throws TimeoutException Indicates that a request timed out */ public Uuid clientInstanceId(Duration timeout); @throws KafkaException If an unexpected error occurs while trying to determine the client * instance ID, though this error does not necessarily imply the * consumer client is otherwise unusable. * @throws IllegalArgumentException If the {@code timeout} is negative. * @throws IllegalStateException If telemetry is not enabled ie, config `{@code enable.metrics.push}` * is set to `{@code false}`. */ public Uuid clientInstanceId(Duration timeout); |
If the client has not yet requested a client instance id from the broker, If the client has not yet requested a client instance id from the broker, this call may block up to the duration of the timeout. In the event that the client instance id cannot be obtained within the timeout, the method throws org.apache.kafka.common.errors.TimeoutException
.
In addition, the following method is added to the KafkaStreams interface to give access to the client instance ids of the producers, consumers and admin clients used by Kafka Streams:
Code Block |
---|
/** * @return The theinternal clients' assigned instance ids used for metrics collection. * * @throws StreamsNotStartedException Indicates that Kafka Streams IllegalArgumentException If {@code timeout} is negative. * @throws IllegalStateException If {@code KafkaStreams} is not running. * @throws TimeoutException Indicates that a request timed out. * @throws StreamsException For Indicatesany thatother aerror requestthat timedmight outoccur. */ public ClientInstanceIds clientInstanceIds(Duration timeout); |
This method is only permitted when Kafka Streams is in state RUNNING or REBALANCING. In the event that Kafka Streams is not in state RUNNING or REBALANCING, the method throws org.apache.kafka.streams.errors.StreamsNotRunningException
, which is a new subclass of InvalidStateStoreException
an IllegalStateException.
In the event that any of the client instance ids cannot be obtained within the timeout, the method throws org.apache.kafka.common.errors.TimeoutException
.
The new class interface org.apache.kafka.streams.ClientInstanceIds
is defined as follows:
Code Block |
---|
/** * Encapsulates the client instance ids used for metrics collection by * producers, consumers and admin clients used by Kafka Streams. */ public classinterface ClientInstanceIds { /** * Get the client instance id of the admin client * * @return the client instance id * * @throws IllegalStateException If telemetry is disabled on the admin client. */ Uuid adminInstanceId(); /** * Get the client instance ids of the consumers * * @return a map from thread key to client instance id */ Map<String, Uuid> consumerInstanceIds(); /** * Get the client instance ids of the producers * * @return a map from thread key to client instance id */ Map<String, Uuid> producerInstanceIds(); } |
Finally, a new method is added to org.apache.kafka.clients.CommonClientConfigs
to return the ClientTelemetryReporter
if configured. This mirrors the similar metricsReporters()
methods in that class.
Code Block | ||
---|---|---|
| ||
public static */ Map<StringOptional<ClientTelemetryReporter> telemetryReporter(String clientId, Uuid>AbstractConfig producerInstanceIds(config); } |
Broker configuration
Configuration | Description | Values |
---|---|---|
telemetry.max.bytes | The maximum size (after compression if compression is used) of telemetry pushed from a client to the broker. | int, default: 1048576, valid values: [1,...] |
...
Metric Name | Type | Group | Tags | Description | |
---|---|---|---|---|---|
ClientMetricsInstanceCount | Gauge | ClientMetrics | version: broker's software versionCurrent number of client metric instances being managed by the broker. E.g., the number of unique CLIENT_INSTANCE_IDs with an empty or non-empty subscription set. | ||
ClientMetricsUnknownSubscriptionRequestCount ClientMetricsUnknownSubscriptionRequestRate | Meter | ClientMetrics | client version: client's software version Total number/rate of metrics requests GetTelemetrySubscriptionsRequests for PushTelemetryRequests with unknown CLIENT_INSTANCE_IDssubscription id. | ||
ClientMetricsThrottleCount ClientMetricsThrottleRate | Meter | ClientMetrics | client_instance_id | Total number/rate of throttled PushTelemetryRequests telemetry requests due to a higher PushTelemetryRequest requests rate than the allowed PushIntervalMs. | |
ClientMetricsPluginExportCount ClientMetricsPluginExportRate | Meter | ClientMetrics | client_instance_id | The total number/rate of metrics requests being pushed to metrics plugins, e.g., the number/rate of exportMetrics() calls. | |
ClientMetricsPluginErrorCount ClientMetricsPluginErrorRate | Meter | ClientMetrics | client_instance_id | The total number/rate of exceptions raised from plugin's exportMetrics(). | |
ClientMetricsPluginExportTimeAvg ClientMetricsPluginExportTimeClientMetricsPluginExportTimeMax | HistogramAvg and Max | ClientMetrics | client_instance_id | Amount of time broker spends in invoking plugin exportMetrics call |
...
Code Block |
---|
This tool helps to manipulate and describe client metrics configurations.
Option Description
------ -----------
--alter Alter the configuration of the client
metrics resource.
--block Block metrics collection.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect to.
connect to>
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client.
--delete Delete the configuration of the client
metrics resource.
--describe List configurations for client metrics resources.
--generate-name Generate a UUID to use as the name.
--help Print usage information.
--interval The metrics push interval in milliseconds.
--match Matching selector ‘k1=v1,k2=v2’. The following
is a list of valid selector names:
client_instance_id
client_id
client_software_name
client_software_version
client_source_address
client_source_port
--metrics Telemetry metric name prefixes ‘m1,m2’.
--name <String> Name of client metrics configuration resource.
--version Display Kafka version. |
...
Code Block | ||
---|---|---|
| ||
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --alter --name MYMETRIC \
--metrics org.apache.kafka.consumer. \
--interval 60000 \
--match "client_software_name=kafka_python,client_software_version=1\.2\..*"
$ kafka-configs.sh --bootstrap-server $BROKERS --alter --entity-type client-metrics --entity-name MYMETRICS \
--add-config "metrics=org.apache.kafka.consumer.,interval.ms=60000,match=[client_software_name=kafka.python,client_software_version=1\.2\..*]" |
Block an existing client metrics configuration resource from pushing metrics
Blocking pushing metrics for a client metrics configuration resource is achieved by setting the push interval to 0ms.
Code Block | ||
---|---|---|
| ||
$ kafka-client-metrics.sh --bootstrap-server $BROKERS --alter --name MYMETRICS --block
$ kafka-configs.sh --bootstrap-server $BROKERS --alter --entity-type client-metrics --entity-name MYMETRICS \
--add-config "interval.ms=0" |
Delete a client metrics configuration resource
...