Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
titlekafka.security.authorizer.AclAuthorizer
class AclAuthorizer extends Authorizer with Logging {
......
	private var metricsGroupauthorizerMetrics: MetricsGroupAuthorizerMetrics = _
......
	override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = {
    	metricsGroupauthorizerMetrics = new MetricsGroup(serverInfo.metrics())
    	serverInfo.endpoints.asScala.map { endpoint =>
      	endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava
	}
......

	private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
		......
		// Record metrics authorization requests
    	authorizerMetrics.recordAuthorizationRequest()

		logAuditMessage(requestContext, action, authorized)
    	if (authorized) {
      		metricsGroupauthorizerMetrics.recordAuthorizationAllowed()
      		AuthorizationResult.ALLOWED
    	} else {
      		metricsGroupauthorizerMetrics.recordAuthorizationDenied()
    	}
   	 	metricsGroupAuthorizationResult.recordAuthorizationRequest()
		logAuditMessage(requestContext, action, authorized)
DENIED
     	if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED	}
	}

	class AuthorizerMetrics(metrics: Metrics) {
    	val GROUP_NAME = "kafka.security.authorizer.metrics"
    	val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed")
    	authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME,
      		"The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied")
    	authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME,
      		"The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request")
    	authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME,
      		"The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size)

    	def recordAuthorizationAllowed(): Unit = {
      		authorizationAllowedSensor.record()
    	}

    	def recordAuthorizationDenied(): Unit = {
      	authorizationDeniedSensor.record()
    	}

    	def recordAuthorizationRequest(): Unit = {
      		authorizationRequestSensor.record()
    	}
  	}
}

...