THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Table of Contents |
---|
Status
Current state: [Under Discussion]Accepted
Discussion thread: here
Vote thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
The following metrics will be added:
Full Name | Type | Description |
---|---|---|
kafka.server:type=kafka.security.authorizer.metrics,name=acls-total-count | 32-bit gauge | Total acls created in the broker |
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-request-rate-per- |
minute | Rate per minute | Total number of authorization requests per minute |
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-allowed-rate-per-minute | Rate per minute | Total number of authorization allowed per minute |
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-denied-rate-per-minute | Rate per minute | Total number of authorization denied per minute |
Proposed Changes
kafka.server.KafkaServer will be updated to pass instance of Metrics to Authorizer.
...
kafka.security.authorizer.AclAuthorizer will be updated to collect and add Authorizer metrics.
Code Block | ||||
---|---|---|---|---|
| ||||
class AclAuthorizer extends Authorizer with Logging { ...... private var authorizerMetrics: AuthorizerMetrics = _ ...... override def monitor(metrics: Metrics): Unit = { authorizerMetrics = new AuthorizerMetrics(metrics) } ...... private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = { ...... logAuditMessage(requestContext, action, authorized) authorizerMetrics.recordAuthorizerMetrics(authorized) if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED } ... class AuthorizerMetrics(metrics: Metrics) { val GROUP_NAME = "kafka.security.authorizer.metrics" val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed") authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME, "The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied") authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME, "The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request") authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME, "The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size) def recordAuthorizerMetrics(authorized: Boolean): Unit = { if (authorized) { authorizationAllowedSensor.record() } else { authorizationDeniedSensor.record() } authorizationRequestSensor.record() } } } |
...