Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public classinterface PluginMetrics {

    /**
     * Create a PluginMetric instance for plugins to register metrics
     *
     * @param metrics The underlying Metrics repository to use for metrics
     * @param className The class name of the plugin
     */
    public PluginMetrics(Metrics metrics, String className) {}

    /**
     * Create a MetricName with the given name, description and tags. The plugin class name will be used as the metric group.
     *
     * @param name        The name of the metric
     * @param description A human-readable description to include in the metric
     * @param tags        additional key/value attributes of the metric
     */
    public MetricName metricName(String name, String description, Map<String, String> tags) {};

    /**
     * Add a metric to monitor an object that implements MetricValueProvider. This metric won't be associated with any
     * sensor. This is a way to expose existing values as metrics.
     *
     * @param metricName The name of the metric
     * @param metricValueProvider The metric value provider associated with this metric
     * @throws IllegalArgumentException if a metric with same name already exists.
     */
    public synchronized void addMetric(MetricName metricName, MetricValueProvider<?> metricValueProvider) {};

    /**
     * Remove a metric if it exists and return it. Return null otherwise.
     *
     * @param metricName The name of the metric
     * @return the removed KafkaMetric or null if no such metric exists
     */
    public synchronized KafkaMetric removeMetric(MetricName metricName) {};

    /**
     * Get or create a sensor with the given unique name.
     *
     * @param name The sensor name
     * @return The sensor
     */
    public synchronized Sensor sensor(String name) {};

    /**
     * Remove a sensor (if it exists) and its associated metrics.
     *
     * @param name The name of the sensor to be removed
     */
    public synchronized void removeSensor(String name) {};

    /**
     * Delete all metrics and sensors registered by this plugin
     */
    void close() {};
}

Proposed Changes

When instantiating a class via the Utils.newInstance() helper methods, if it implements Monitorable and a Metrics object is available, a new PluginMetrics instance will be created and passed to the withPluginMetrics() method. It will be always called after configure().  Metrics registered by plugins will inherit the prefix/namespace from the current Metrics instance, these are: kafka.producer, kafka.consumer, kafka.connect, kafka.streams and kafka.server. Metrics reporters should not implement the Monitorable interface as they are created before the Metrics instance.

...

Common
- ConfigProvider: config.providers
- AuthenticateCallbackHandler: sasl.client.callback.handler.class, sasl.login.callback.handler.class
- Login: sasl.login.class
- SslEngineFactory: ssl.engine.factory.class
- SecurityProviderCreator: security.providers

MirrorMaker

- ReplicationPolicy: replication.policy.class
- ConfigPropertyFilter: config.property.filter.class
- GroupFilter: group.filter.class
- TopicFilter: topic.filter.class
- ForwardingAdmin: forwarding.admin.class

...

Code Block
languagejava
public class MyInterceptor<K, V> implements ProducerInterceptor<K, V>, Monitorable {

    private Sensor sensor;

    public void setPluginMetricswithPluginMetrics(PluginMetrics metrics) {
        sensor = metrics.sensor("onSend");
        MetricName rate = metrics.metricName("rate", "Average number of calls per second.", Collections.emptyMap());
        MetricName total = metrics.metricName("total", "Total number of calls.", Collections.emptyMap());
        sensor.add(rate, new Rate());
        sensor.add(total, new CumulativeCount());
    }

    @Override
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        sensor.record();
        return record;
    }
    
    ...
}

...