THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
class AclAuthorizer extends Authorizer with Logging { ...... private var metricsGroupauthorizerMetrics: MetricsGroupAuthorizerMetrics = _ ...... override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = { metricsGroupauthorizerMetrics = new MetricsGroup(serverInfo.metrics()) serverInfo.endpoints.asScala.map { endpoint => endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava } ...... private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = { ...... // Record metrics authorization requests authorizerMetrics.recordAuthorizationRequest() logAuditMessage(requestContext, action, authorized) if (authorized) { metricsGroupauthorizerMetrics.recordAuthorizationAllowed() AuthorizationResult.ALLOWED } else { metricsGroupauthorizerMetrics.recordAuthorizationDenied() } metricsGroupAuthorizationResult.recordAuthorizationRequest() logAuditMessage(requestContext, action, authorized) DENIED if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED } } class AuthorizerMetrics(metrics: Metrics) { val GROUP_NAME = "kafka.security.authorizer.metrics" val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed") authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME, "The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied") authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME, "The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request") authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME, "The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size) def recordAuthorizationAllowed(): Unit = { authorizationAllowedSensor.record() } def recordAuthorizationDenied(): Unit = { authorizationDeniedSensor.record() } def recordAuthorizationRequest(): Unit = { authorizationRequestSensor.record() } } } |
...