You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka exposes many pluggable API for users to bring their custom plugins. For complex and critical plugins it's important to have metrics to monitor their behavior. Plugins wanting to emit metrics can use the Metrics class from the Kafka API but when creating a new Metrics instance it does not inherits the tags from the component it depends on (for example from a producer for a custom partitioner), or the registered metrics reporters. As most plugins are configurable, a workaround is to reimplement the metric reporters logic and in some case for tags too but that is cumbersome. Also by creating a separate Metrics instance, these metrics are separate from the client's and in case multiple clients are running in the same JVM, for example multiple producers, it can be hard to identify the specific client that is associated with some plugin metrics.

This issue also applies to connectors and tasks in Kafka Connect. For example MirrorMaker2 creates its own Metrics object and has logic to add the metric reporters from the configuration.

In this proposal, a "plugin" is an interface users can implement and that is instantiated by Kafka. For example, a class implementing org.apache.kafka.server.policy.CreateTopicPolicy is considered a plugin as it's instantiated by brokers. On the other hand a class implementing org.apache.kafka.clients.producer.Callback is not considered a plugin as it's instantiated in user logic.

Public Interfaces

I propose introducing a new interface: Monitorable. If a plugin implements this interface, the withPluginMetrics() method will be called when the plugin is instantiated (after configure() if the plugin also implements Configurable). This will allow plugins to adds their own metrics to the component (producer, consumer, etc) that instantiated them.

Monitorable.java
package org.apache.kafka.common.metrics;

public interface Monitorable {

    /**
     * Get the PluginMetrics instance from the client that instantiates the plugin.
     */
    void withPluginMetrics(PluginMetrics metrics);

}

The PluginMetrics class has methods to add and remove metrics and sensors. It will forward calls to the underlying Metrics instance. Plugins will only be able to remove metrics they created. Metrics created via this class will have their group set to the class name of the plugin.

public interface PluginMetrics {

    /**
     * Create a MetricName with the given name, description and tags. The plugin class name will be used as the metric group.
     *
     * @param name        The name of the metric
     * @param description A human-readable description to include in the metric
     * @param tags        additional key/value attributes of the metric
     */
    public MetricName metricName(String name, String description, Map<String, String> tags);

    /**
     * Add a metric to monitor an object that implements MetricValueProvider. This metric won't be associated with any
     * sensor. This is a way to expose existing values as metrics.
     *
     * @param metricName The name of the metric
     * @param metricValueProvider The metric value provider associated with this metric
     * @throws IllegalArgumentException if a metric with same name already exists.
     */
    public synchronized void addMetric(MetricName metricName, MetricValueProvider<?> metricValueProvider);

    /**
     * Remove a metric if it exists and return it. Return null otherwise.
     *
     * @param metricName The name of the metric
     * @return the removed KafkaMetric or null if no such metric exists
     */
    public synchronized KafkaMetric removeMetric(MetricName metricName);

    /**
     * Get or create a sensor with the given unique name.
     *
     * @param name The sensor name
     * @return The sensor
     */
    public synchronized Sensor sensor(String name);

    /**
     * Remove a sensor (if it exists) and its associated metrics.
     *
     * @param name The name of the sensor to be removed
     */
    public synchronized void removeSensor(String name);

    /**
     * Delete all metrics and sensors registered by this plugin
     */
    void close();
}

Proposed Changes

When instantiating a class via the Utils.newInstance() helper methods, if it implements Monitorable and a Metrics object is available, a new PluginMetrics instance will be created and passed to the withPluginMetrics() method. It will be always called after configure().  Metrics registered by plugins will inherit the prefix/namespace from the current Metrics instance, these are: kafka.producer, kafka.consumer, kafka.connect, kafka.streams and kafka.server. Metrics reporters should not implement the Monitorable interface as they are created before the Metrics instance.

Supported plugins

The goal is for all plugins, apart from MetricsReporter and KafkaMetricsReporter, to support this feature:

Broker
- KafkaPrincipalBuilder: principal.builder.class
- ReplicaSelector: replica.selector.class
- AlterConfigPolicy: alter.config.policy.class.name
- Authorizer: authorizer.class.name
- ClientQuotaCallback: client.quota.callback.class
- CreateTopicPolicy: create.topic.policy.class.name

Producer
- Serializer: key.serializer, value.serializer
- Partitioner: partitioner.class
- ProducerInterceptor: interceptor.classes

Consumer
- Deserializer: key.deserializer, value.deserializer
- ConsumerPartitionAssignor: partition.assignment.strategy
- ConsumerInterceptor: interceptor.classes

Connect
- Converter: key.converter, value.converter
- ConnectorClientConfigOverridePolicy: connector.client.config.override.policy
- HeaderConverter: header.converter
- ConnectRestExtension: rest.extension.classes
- Connector
- Task
- Transformation
- Predicate

Streams
- DeserializationExceptionHandler: default.deserialization.exception.handler
- Serde: default.key.serde, default.list.key.serde.inner, default.list.key.serde.type, default.list.value.serde.inner, default.list.value.serde.type, windowed.inner.class.serde
- ProductionExceptionHandler: default.production.exception.handler
- TimestampExtractor: default.timestamp.extractor
- RocksDBConfigSetter: rocksdb.config.setter

Common
- ConfigProvider: config.providers
- AuthenticateCallbackHandler: sasl.client.callback.handler.class, sasl.login.callback.handler.class
- Login: sasl.login.class
- SslEngineFactory: ssl.engine.factory.class
- SecurityProviderCreator: security.providers

MirrorMaker

- ReplicationPolicy: replication.policy.class
- ConfigPropertyFilter: config.property.filter.class
- GroupFilter: group.filter.class
- TopicFilter: topic.filter.class
- ForwardingAdmin: forwarding.admin.class

Example usage

For example if we create a custom ProducerInterceptor

public class MyInterceptor<K, V> implements ProducerInterceptor<K, V>, Monitorable {

    private Sensor sensor;

    public void withPluginMetrics(PluginMetrics metrics) {
        sensor = metrics.sensor("onSend");
        MetricName rate = metrics.metricName("rate", "Average number of calls per second.", Collections.emptyMap());
        MetricName total = metrics.metricName("total", "Total number of calls.", Collections.emptyMap());
        sensor.add(rate, new Rate());
        sensor.add(total, new CumulativeCount());
    }

    @Override
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        sensor.record();
        return record;
    }
    
    ...
}

If the producer using this plugin has its client-id set to producer-1, the metrics created by this plugin will have the following name: kafka.producer:type=MyInterceptor,client-id=producer-1 and these attributes: rate and total.

Compatibility, Deprecation, and Migration Plan

This is a new feature so it has no impact on deprecation and does not need a migration plan. Plugins they should be able to function even if their withPluginMetrics() method is not called.

Test Plan

This feature will be tested using unit and integration tests.

Rejected Alternatives

  • Create a dedicated Metrics instance for plugins: A dedicated instance could have its own prefix/namespace (for example kafka.consumer.plugins). This would allow grouping metrics from all plugins but it requires instantiating another Metrics instance and new metrics reporters.
  • Let plugins create their own Metrics instance: Instead of passing the Metrics instance to plugins we could pass all the values necessary (metrics reporters, configs, etc ...) to create and configure a Metrics instance. This is impractical as it requires passing a lot of values around and plugins still have to have logic to use them.
  • Provide the Metrics instance to Kafka Connect Connectors and Tasks via their context: Using a different mechanism that could introduce compatibility issues. Connectors and Tasks also
  • No labels