Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state[Under Discussion]Accepted

Discussion thread: here 

Vote thread: here 

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9958

...


The following metrics will be added:

Full NameTypeDescription
kafka.server:type=kafka.security.authorizer.metrics,name=acls-total-count32-bit gaugeTotal acls created in the broker
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-request-rate-per-
minute
minuteRate per minuteTotal number of authorization requests per minute
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-allowed-rate-per-minuteRate per minuteTotal number of authorization allowed per minute
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-denied-rate-per-minuteRate per minuteTotal number of authorization denied per minute 

Proposed Changes

kafka.server.KafkaServer will be updated to pass instance of Metrics to Authorizer.

...

kafka.security.authorizer.AclAuthorizer will be updated to collect and add Authorizer metrics.

Code Block
languagescala
titlekafka.security.authorizer.AclAuthorizer
class AclAuthorizer extends Authorizer with Logging {
......
	private var authorizerMetrics: AuthorizerMetrics = _
......
	override def monitor(metrics: Metrics): Unit = {
    	authorizerMetrics = new AuthorizerMetrics(metrics)
  	}
......

	private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
		......
		logAuditMessage(requestContext, action, authorized)
		authorizerMetrics.recordAuthorizerMetrics(authorized)
    	if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
	}
...

	class AuthorizerMetrics(metrics: Metrics) {
    	val GROUP_NAME = "kafka.security.authorizer.metrics"
    	val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed")
    	authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME,
      		"The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied")
    	authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME,
      		"The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request")
    	authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME,
      		"The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size)

    	def recordAuthorizerMetrics(authorized: Boolean): Unit = {
      		if (authorized) {
        		authorizationAllowedSensor.record()
      		} else {
        		authorizationDeniedSensor.record()
      		}
      		authorizationRequestSensor.record()
    	}
  	}
}

...