...
kafka.security.authorizer.AclAuthorizer will be updated for adding following Kafka metrics
Metrics group name: "kafka.security.authorizer.metrics"
Metric "aclacls-total-count" for reporting total acls created.
Metric "authorization-request-rate-per-hourminute" for reporting total number of authorization requests per hourminute
Metric "authorization-allowed-rate-per-hourminute" for reporting total number of authorization allowed per hourper minute
Metric "authorization-denied-rate-per-hourminute" for reporting total number of authorization denied per hour per minute
Code Block | ||||
---|---|---|---|---|
| ||||
class AclAuthorizer extends Authorizer with Logging { ...... private var metricsGroup: MetricsGroup = _ ...... override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = { metricsGroup = new MetricsGroup(serverInfo.metrics()) serverInfo.endpoints.asScala.map { endpoint => endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava } ...... private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = { ...... // Record metrics if (authorized) { metricsGroup.recordAuthorizationAllowed() } else { metricsGroup.recordAuthorizationDenied() } metricsGroup.recordAuthorizationRequest() logAuditMessage(requestContext, action, authorized) if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED } class MetricsGroupAuthorizerMetrics(metrics: Metrics) { val GROUP_NAME = "acl-authorizer-kafka.security.authorizer.metrics" val authorizationAllowedSensor = metrics.sensor("aclauthorizer-authorization-allowed") authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-hourminute", GROUP_NAME, "The number of authoization allowed per hour"), new Rate(TimeUnit.HOURSMINUTES, new WindowedCount())) val authorizationDeniedSensor = metrics.sensor("aclauthorizer-authorization-denied") authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-hourminute", GROUP_NAME, "The number of authoization denied per hour"), new Rate(TimeUnit.HOURSMINUTES, new WindowedCount())) val authorizationRequestSensor = metrics.sensor("aclauthorizer-authorization-request") authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-hourminute", GROUP_NAME, "The number of authoization request per hour"), new Rate(TimeUnit.HOURSMINUTES, new WindowedCount())) metrics.addMetric(metrics.metricName("aclacls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size) def recordAuthorizationAllowed(): Unit = { authorizationAllowedSensor.record() } def recordAuthorizationDenied(): Unit = { authorizationDeniedSensor.record() } def recordAuthorizationRequest(): Unit = { authorizationRequestSensor.record() } } } |
Compatibility, Deprecation, and Migration Plan
Broker will start Authorizer plugins with AuthorizerServerInfo containing the new method 'Metrics metrics()' which has default implementation. Old version of Authorizer plugins will not call the new method, so it is backward to old version of Authorizer plugins.
Test Plan
Test total number of acls created match with the number from metrics
Test authorization request rate per minute
Test authorization allowed rate per minute
Test authorization denied rate per minute
Rejected Alternatives
Authorizer plugins create and configure own Metrics and JmxReport. Using this alternative approach make collecting Kafka metrics in Authorizer plugin separate from broker metrics that cause disconnection. Users need implement and manage own Metrics and JmsReport with extra effort.
...