Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Define a new interface Monitorable in package org.apache.kafka.common. The authorizer plugins can implement the interface and broker can pass the instance of Metrics to Authorizer. Other  Other broker'd or client's plugins could potentially implement the interface in the future and get the broker's or client's Metrics instance to add additional metrics from sub-components. Support for other plugins will be addressed later in another KIP if necessary. This KIP will focus on exposing Kafka Metrics in Authorizer only.

Code Block
languagejava
titleBroker Runtime Config
package org.apache.kafka.common;

import org.apache.kafka.common.metrics.Metrics;

/**
 * Interface for plugins to get Metrics instance
 */
public interface Monitorable {
    /**
     * Get the instance of {@link Metrics}.
     */
    default void monitor(Metrics metrics) {
		return;
	};
}


The following metrics would be added:

Metrics group name: "kafka.security.authorizer.metrics"
Metric "acls-total-count" for reporting total acls created.
Metric "authorization-request-rate-per-minute" for reporting total number of authorization requests per minute
Metric "authorization-allowed-rate-per-minute" for reporting total number of authorization allowed per minute
Metric "authorization-denied-rate-per-minute" for reporting total number of authorization denied per minute    

Proposed Changes

org.apache.kafka.server.authorizer.Authorizer Metrics need be updated to extends Monitorable Interface.

...

Code Block
languagescala
titlekafka.server.KafkaServer
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
authorizer.foreach(_.monitor(metrics))
...

...

.

...

...



Code Block
languagescala
titlekafka.security.authorizer.AclAuthorizer
class AclAuthorizer extends Authorizer with Logging {
......
	private var authorizerMetrics: AuthorizerMetrics = _
......
	override def monitor(metrics: Metrics): Unit = {
    	authorizerMetrics = new AuthorizerMetrics(metrics)
  	}
......

	private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
		......
		logAuditMessage(requestContext, action, authorized)
		authorizerMetrics.recordAuthorizerMetrics(authorized)
    	if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
	}
...

	class AuthorizerMetrics(metrics: Metrics) {
    	val GROUP_NAME = "kafka.security.authorizer.metrics"
    	val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed")
    	authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME,
      		"The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied")
    	authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME,
      		"The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request")
    	authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME,
      		"The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size)

    	def recordAuthorizerMetrics(authorized: Boolean): Unit = {
      		if (authorized) {
        		authorizationAllowedSensor.record()
      		} else {
        		authorizationDeniedSensor.record()
      		}
      		authorizationRequestSensor.record()
    	}
  	}
}

...

Authorizer plugins create and configure own Metrics and JmxReport. Using this alternative approach make collecting Kafka metrics in Authorizer plugin separate from broker metrics that cause disconnection. Users need implement and manage own Metrics and JmsReport with extra effort

Pass the instance of Metrics through AuthorizerServerInfo interface. The instance of Metrics should not be part of Authorizer server information. We use an interface to expose Kafka Metrics so that other broker's or client's plugins could potentially implement the interface