Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As a GetTelemetrySubscriptionsRequest is received for a previously unknown client instance id, the CLIENT_METRICS config cache is scanned for any configured metric subscriptions whose match selectors match that of the client. The resulting matching configuration entries are compiled into a list of subscribed metrics which is returned in GetTelemetrySubscriptionsResponse.RequestedMetrics along with the minimum configured collection interval (this can be improved in future versions by including a per-metric interval so that each subscribed metric is collected with its configured interval, but in its current form longer-interval metrics are included “for free” if there are shorter-interval metrics in the subscription set). The CRC32 CRC32C checksum is also calculated based on the compiled metrics and is returned as the SubscriptionId in the response, as well as stored in the per-client-instance cache on the broker to track configuration changes.

...

The SubscriptionId is a unique identifier for a client instance's subscription set, the id is generated by calculating a CRC32 CRC32C of the configured metrics subscriptions matching the given client including the PushIntervalMs, XORed with the ClientInstanceId. This SubscriptionId is returned to the client in the GetTelemetrySubscriptionsResponse and the client passes that SubscriptionId in each subsequent PushTelemetryRequest for those received metrics. If the configured subscriptions are updated and resulting in a change for a client instance, the SubscriptionId is recalculated. Upon the next PushTelemetryRequest using the previous SubscriptionId, the broker will find that the received and expected SubscriptionIds differ and it will return UNKNOWN_SUBSCRIPTION_ID  back to the client. When a client receives this error code it will immediately send a GetTelemetrySubscriptionsRequest to retrieve the new subscription set along with a new SubscriptionId.

...

Code Block
/**
 * @return the client's assigned instance id used for metrics collection.
 * @throws InterruptException       If the thread is interrupted while blocked.
 * @throws TimeoutException         Indicates that a request timed out
 */
public Uuid clientInstanceId(Duration timeout); @throws KafkaException           If an unexpected error occurs while trying to determine the client
 *                                  instance ID, though this error does not necessarily imply the
 *                                  consumer client is otherwise unusable.
 * @throws IllegalArgumentException If the {@code timeout} is negative.
 * @throws IllegalStateException    If telemetry is not enabled ie, config `{@code enable.metrics.push}`
 *                                  is set to `{@code false}`.
 */
public Uuid clientInstanceId(Duration timeout);

If the client has not yet requested a client instance id from the broker, If the client has not yet requested a client instance id from the broker, this call may block up to the duration of the timeout. In the event that the client instance id cannot be obtained within the timeout, the method throws org.apache.kafka.common.errors.TimeoutException .

In addition, the following method is added to the KafkaStreams interface to give access to the client instance ids of the producers, consumers and admin clients used by Kafka Streams:

Code Block
/**
 * @return The theinternal clients' assigned instance ids used for metrics collection.
 *
 * @throws IllegalStateException    Indicates that Kafka Streams IllegalArgumentException If {@code timeout} is negative.
 * @throws IllegalStateException If {@code KafkaStreams} is not running.
 * @throws TimeoutException Indicates that  a  request timed out.
 * @throws Indicates that a request timed outStreamsException For any other error that might occur.
 */
public ClientInstanceIds clientInstanceIds(Duration timeout);

...

Code Block
/**
 * Encapsulates the client instance ids used for metrics collection by
 * producers, consumers and admin clients used by Kafka Streams.
 */
public interface ClientInstanceIds {
  /**
   * Get the client instance id of the admin client
   *
   * @return the client instance id
   *
   * @throws IllegalStateException If telemetry is disabled on the admin client.
   */
  Uuid adminInstanceId();

  /**
   * Get the client instance ids of the consumers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, Uuid> consumerInstanceIds();

  /**
   * Get the client instance ids of the producers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, Uuid> producerInstanceIds();
}

...