Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state[Under Discussion]Accepted

Discussion thread: here 

Vote thread: here 

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9958

...

Code Block
languagejava
titleBroker Runtime ConfigMonitorable Interface
package org.apache.kafka.common;

import org.apache.kafka.common.metrics.Metrics;

/**
 * Interface for plugins to get Metrics instance
 */
public interface Monitorable {
    /**
     * Get the instance of {@link Metrics}.
     */
    default void monitor(Metrics metrics) {
		return;
	};
}


The org.apache.kafka.server.authorizer.Authorizer

...

interface need be updated to

...

extend Monitorable Interface.

Code Block
languagejava
titleAuthorizer Interface
package org.apache.kafka.server.authorizer.Authorizer

...


... 
import org.apache.kafka.common.Monitorable;

...


...
public interface Authorizer extends Configurable, Closeable, Monitorable {
...
}


The following metrics
would will be added:Metrics group name: "

Full NameTypeDescription
kafka.server:type=kafka.security.authorizer.metrics

...

,name=acls-total-count

...

32-bit gaugeTotal acls created in the broker
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-request-rate-per-minute

...

Rate per minuteTotal number of authorization requests per minute

...

kafka.server:type=kafka.security.authorizer.metrics,name=authorization-allowed-rate-per-minute

...

Rate per minuteTotal number of authorization allowed per minute

...

kafka.server:type=kafka.security.authorizer.metrics,name=authorization-denied-rate-per-minute

...

Rate per minuteTotal number of authorization denied

...

per minute 

Proposed Changes

kafka.server.KafkaServer will be updated to pass instance of Metrics to Authorizer.

Code Block
languagescala
titlekafka.server.KafkaServer
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
authorizer.foreach(_.monitor(metrics))
...


kafka.security.authorizer.AclAuthorizer will be updated to collect and add Authorizer metrics.

Code Block
languagescala
titlekafka.security.authorizer.AclAuthorizer
class AclAuthorizer extends Authorizer with Logging {
......
	private var authorizerMetrics: AuthorizerMetrics = _
......
	override def monitor(metrics: Metrics): Unit = {
    	authorizerMetrics = new AuthorizerMetrics(metrics)
  	}
......

	private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
		......
		logAuditMessage(requestContext, action, authorized)
		authorizerMetrics.recordAuthorizerMetrics(authorized)
    	if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
	}
...

	class AuthorizerMetrics(metrics: Metrics) {
    	val GROUP_NAME = "kafka.security.authorizer.metrics"
    	val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed")
    	authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME,
      		"The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied")
    	authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME,
      		"The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request")
    	authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME,
      		"The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size)

    	def recordAuthorizerMetrics(authorized: Boolean): Unit = {
      		if (authorized) {
        		authorizationAllowedSensor.record()
      		} else {
        		authorizationDeniedSensor.record()
      		}
      		authorizationRequestSensor.record()
    	}
  	}
}

...