Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
titlekafka.security.authorizer.AclAuthorizer
class AclAuthorizer extends Authorizer with Logging {
......
	private var authorizerMetrics: AuthorizerMetrics = _
......
	override def monitor(metrics: Metrics): Unit = {
    	authorizerMetrics = new AuthorizerMetrics(metrics)
  	}
......

	private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
		......
		// Record authorization requests
    	authorizerMetrics.recordAuthorizationRequest()

		logAuditMessage(requestContext, action, authorized)
    	if 		authorizerMetrics.recordAuthorizerMetrics(authorized) {
      		authorizerMetrics.recordAuthorizationAllowed()
      		AuthorizationResult.ALLOWED
    	} else {
      		authorizerMetrics.recordAuthorizationDenied()
      			if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
    	}
	}...

	class AuthorizerMetrics(metrics: Metrics) {
    	val GROUP_NAME = "kafka.security.authorizer.metrics"
    	val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed")
    	authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME,
      		"The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied")
    	authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME,
      		"The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request")
    	authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME,
      		"The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size)

    	def recordAuthorizationAllowedrecordAuthorizerMetrics(authorized: Boolean): Unit = {
      		authorizationAllowedSensor.recordif (authorized) {
    	}

    		authorizationAllowedSensor.record()
    	def recordAuthorizationDenied(): Unit = 		} else {
        		authorizationDeniedSensor.record()
    	}

    	def recordAuthorizationRequest(): Unit = {		}
      		authorizationRequestSensor.record()
    	}
  	}
}

...