Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added KafkaStreams method to obtain instance ids

...

The following method is added to the Producer, Consumer, and Admin client interfaces:

Code Block
/**
 * @returns@return the client's assigned instance id used for metrics collection.
 */
public String clientInstanceId(Duration timeout);

If the client has not yet requested a client instance id from the broker, this call may block up to the duration of the timeout. If no client instance id can be retrieved within the timeout, an error is returned (such as timeout, feature not supported by broker, auth failure).

In addition, the following method is added to the KafkaStreams interface to give access to the client instance ids of the producers, consumers and admin clients used by Kafka Streams:

Code Block
/**
 * @return the clients' assigned instance ids used for metrics collection.
 */
public ClientsInstanceIds clientsInstanceId(Duration timeout);

The new class org.apache.kafka.streams.ClientsInstanceIds is defined as follows:

Code Block
/**
 * Encapsulates the client instance ids used for metrics collection by
 * producers, consumers and admin clients used by Kafka Streams.
 */
public class ClientsInstanceIds {
  /**
   * Get the client instance id of the admin client
   *
   * @return the client instance id 
   */
  String adminInstanceId();

   /**
   * Get the client instance id of the global consumer
   *
   * @return the client instance id, if any
   */
  Optional<String> globalConsumerInstanceId();

  /**
   * Get the client instance ids of the main consumers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, String> mainConsumerInstanceIds();

  /**
   * Get the client instance ids of the restore consumers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, String> restoreConsumerInstanceIds();

  /**
   * Get the client instance id of the producers
   *
   * @return a map from thread key to client instance id
   */
  Map<String, String> producerInstanceIds();
}

Broker configuration

ConfigurationDescriptionValues
telemetry.max.bytes The maximum size (after compression if compression is used) of telemetry pushed from a client to the broker.int, default: 1048576, valid values: [1,...]

...

In network environments where there are network proxies (such as Kubernetes ingress) on the path between the client and broker, it may be problematic obtaining the originating client's IP address. One way to address this in the future would be to support the PROXY protocol in Kafka.

Configuration properties or extensions to the Metrics plugin interface on the broker to change the temporality is outside the scope of this KIP and may be addressed at a later time as the need arises. For example, if a metrics back-end has a preference for a particular temporality, it may be helpful to let it indicate that using the Metrics plugin interface so that the broker can use this temporality when requesting metrics from the clients.

Compatibility, Deprecation, and Migration Plan

...