Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleMonitorable.java
package org.apache.kafka.common.metrics;

public interface Monitorable {

    /**
     * Get the PluginMetrics instance from the client that instantiates the plugin.
     */
    void withPluginMetrics(PluginMetrics metrics);

}

The PluginMetrics class interface has methods to add and remove metrics and sensors. It will forward calls to the underlying Metrics instance. Plugins will only be able to remove metrics and sensors they created. Metrics created via this class will have their group set to the class name of "plugins" and include tags that uniquely identify the plugin.

Code Block
languagejava
public interface PluginMetrics extends Closeable {

    /**
     * Name of the tag for the plugin identifier.
     */
    String PLUGIN_ID_TAG = "plugin-id";

    /**
     * Create a {@link MetricName} with the given name, description and tags.
 The group will be *set The plugin class name is used as the metric group and the plugin identifier isto "plugins"
     * Tags to uniquely identify the plugins are automatically added to the
     * tags using {@link #PLUGIN_ID_TAG} as the name.provided tags
     *
     * @param name        The name of the metric
     * @param description A human-readable description to include in the metric
     * @param tags        additional key/value attributes of the metric
     */
    MetricName metricName(String name, String description, Map<String, String> tags);

    /**
     * Add a metric to monitor an object that implements {@link MetricValueProvider}. This metric won't be associated with any
     * sensor. This is a way to expose existing values as metrics.
     *
     * @param metricName The name of the metric
     * @param metricValueProvider The metric value provider associated with this metric
     * @throws IllegalArgumentException if a metric with same name already exists.
     */
    void addMetric(MetricName metricName, MetricValueProvider<?> metricValueProvider);

    /**
     * Remove a metric if it exists.
     *
     * @param metricName The name of the metric
     */
    void removeMetric(MetricName metricName);

    /**
     * Get or createCreate a sensor with the given unique name.
     *
     * @param name The sensor name
     * @return The sensor
     * @throws IllegalArgumentException if a sensor with same name already exists.
     */
    Sensor sensor(String name);

    /**
     * Remove a sensor (if it exists) and its associated metrics.
     *
     * @param name The name of the sensor to be removed
     */
    void removeSensor(String name);
}

The PluginMetrics interface implements Closeable. Calling the close() method removed all metrics and sensors created by this plugin.

Proposed Changes

When instantiating a class via the Utils.newInstance() helper methods, if it implements Monitorable and a Metrics object is available, a new PluginMetrics instance will be created and passed to the withPluginMetrics() method. It , the withPluginMetrics() method will be called. If the class is also Configurable, withPluginMetrics() will be always called after configure().   Metrics registered by plugins will inherit the prefix/namespace from the current Metrics instance, these are: kafka.producer, kafka.consumer, kafka.connect, kafka.streams and kafka.server. Metrics reporters should not implement the Monitorable interface as they are created before the Metrics instanceTags will be added to metrics and sensors tags created via the PluginMetrics interface to unique identify each instance.

For all plugins apart from Connectors, Tasks, Converters, Transformations and Predicates, a tag containing the configuration name (config) will be added. For example, metrics registered by a custom Serializer named MySerializer configured via key.serializer will have the following name: kafka.producer:type=plugins,client-id=producer-1,class=MySerializer,config=key.serializer

For Connectors and Converters, the name of the connector (connector) will be added as a tag. Tasks will add the connector name and the task id (task) added as tags. Transformations and Predicates will have the connector name, the task id and their alias (alias) added as tags. For example for a task: kafka.connect:type=plugins,class=MyTask,connector=my-sink,task=0

For configurations that accept a list of classes, for example interceptor.classes, if the same class is provided multiple times, their metrics may collide. This is deemed highly unlikely to occur as there are no use cases where providing multiple times the same class is useful.

Supported plugins

The goal is for all plugins, apart from MetricsReporter and KafkaMetricsReporter (as these plugins are created before the Metrics instance), to support this feature:

...

If the producer using this plugin has its client-id set to producer-1, the metrics created by this plugin will have the following name: kafka.producer:type=MyInterceptorplugins,client-id=producer-1,class=MySerializer,config=interceptor.classes and these attributes: rate and total.

...

This is a new feature so it has no impact on deprecation and does not need a migration plan. Plugins they should be able to function even if their withPluginMetrics() method is not called.

...

  • Create a dedicated Metrics instance for plugins: A dedicated instance could have its own prefix/namespace (for example kafka.consumer.plugins). This would allow grouping metrics from all plugins but it requires instantiating another Metrics instance and new metrics reporters.
  • Let plugins create their own Metrics instance: Instead of passing the Metrics instance to plugins we could pass all the values necessary (metrics reporters, configs, etc ...) to create and configure a Metrics instance. This is impractical as it requires passing a lot of values around and plugins still have to have logic to use them.
  • Provide the Metrics instance to Kafka Connect Connectors and Tasks via their context: Using a different mechanism that could introduce compatibility issues. Connectors and Tasks alsoWe would have 2 different mechanisms, one for Connectors/Tasks and one of all other plugins, for the same feature. Also using the Connector and Task contexts has an impact on compatibility.