Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka exposes many pluggable API for users to bring their custom plugins. For complex and critical plugins it's important to have metrics to monitor their behavior. Plugins wanting to emit metrics can use the Metrics class from the Kafka API but when creating a new Metrics instance it does not inherits the tags from the component it depends on (for example from a producer for a custom partitioner), or the registered metrics reporters. As most plugins are configurable, a workaround is to reimplement the metric reporters logic and in some case for tags too but that is cumbersome. Also by creating a separate Metrics instance, these metrics are separate from the client's and in case multiple clients are running in the same JVM, for example multiple producers, it can be hard to identify the specific client that is associated with some plugin metrics.

This issue also applies to connectors and tasks in Kafka Connect. For example MirrorMaker2 creates its own Metrics object and has logic to add the metric reporters from the configuration.

In this proposal, a "plugin" is an interface users can implement and that is instantiated by Kafka. For example, a custom CreateTopicPolicy is considered a plugin as it's instantiated by brokers. On the other hand a class implementing Callback is not considered a plugin as it's instantiated in user logic.

Public Interfaces

I propose introducing a new interface: Monitorable. If a plugin implements this interface, the withPluginMetrics() method will be called when the plugin is instantiated (after configure() if the plugin also implements Configurable). This will allow plugins to adds their own metrics to the component (producer, consumer, etc) that instantiated them.

...

When instantiating a class via the Utils.newInstance() helper methods, if it implements Monitorable and a Metrics object is available, a new PluginMetrics instance will be created and passed to the withPluginMetrics() method. It will be always called after configure().  Metrics registered by plugins will inherit the prefix/namespace from the current Metrics instance, these are: kafka.producer, kafka.consumer, kafka.connect, kafka.streams and kafka.server. Metrics reporters should not implement the Monitorable interface as they are created before the Metrics instance.

Supported plugins

The goal is for all plugins, apart from MetricsReporter and KafkaMetricsReporter, to support this feature:

Broker
- KafkaPrincipalBuilder: principal.builder.class
- ReplicaSelector: replica.selector.class
- AlterConfigPolicy: alter.config.policy.class.name
- Authorizer: authorizer.class.name
- ClientQuotaCallback: client.quota.callback.class
- CreateTopicPolicy: create.topic.policy.class.name

Producer
- Serializer: key.serializer, value.serializer
- Partitioner: partitioner.class
- ProducerInterceptor: interceptor.classes

Consumer
- Deserializer: key.deserializer, value.deserializer
- ConsumerPartitionAssignor: partition.assignment.strategy
- ConsumerInterceptor: interceptor.classes

Connect
- Converter: key.converter, value.converter
- ConnectorClientConfigOverridePolicy: connector.client.config.override.policy
- HeaderConverter: header.converter
- ConnectRestExtension: rest.extension.classes
- Connector
- Task
- Transformation
- Predicate

Streams
- DeserializationExceptionHandler: default.deserialization.exception.handler
- Serde: default.key.serde, default.list.key.serde.inner, default.list.key.serde.type, default.list.value.serde.inner, default.list.value.serde.type, windowed.inner.class.serde
- ProductionExceptionHandler: default.production.exception.handler
- TimestampExtractor: default.timestamp.extractor
- RocksDBConfigSetter: rocksdb.config.setter

Common
- ConfigProvider: config.providers
- AuthenticateCallbackHandler: sasl.client.callback.handler.class, sasl.login.callback.handler.class
- Login: sasl.login.class
- SslEngineFactory: ssl.engine.factory.class
- SecurityProviderCreator: security.providers

MirrorMaker
- ConfigPropertyFilter: config.property.filter.class
- GroupFilter: group.filter.class
- TopicFilter: topic.filter.class
- ForwardingAdmin: forwarding.admin.class

Example usage

For example if we create a custom ProducerInterceptor

...