Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

kafka.security.authorizer.AclAuthorizer will be updated to collect and add Authorizer metrics.

Code Block
languagescala
titlekafka.security.authorizer.AclAuthorizer
class AclAuthorizer extends Authorizer with Logging {
......
	private var authorizerMetrics: AuthorizerMetrics = _
......
	override def monitor(metrics: Metrics): Unit = {
    	authorizerMetrics = new AuthorizerMetrics(metrics)
  	}
......

	private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
		......
		logAuditMessage(requestContext, action, authorized)
		authorizerMetrics.recordAuthorizerMetrics(authorized)
    	if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
	}
...

	class AuthorizerMetrics(metrics: Metrics) {
    	val GROUP_NAME = "kafka.security.authorizer.metrics"
    	val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed")
    	authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME,
      		"The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied")
    	authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME,
      		"The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request")
    	authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME,
      		"The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size)

    	def recordAuthorizerMetrics(authorized: Boolean): Unit = {
      		if (authorized) {
        		authorizationAllowedSensor.record()
      		} else {
        		authorizationDeniedSensor.record()
      		}
      		authorizationRequestSensor.record()
    	}
  	}
}

...