Table of Contents |
---|
Status
Current state: [Under Discussion]Accepted
Discussion thread: here
Vote thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.common; import org.apache.kafka.common.metrics.Metrics; /** * Interface for plugins to get Metrics instance */ public interface Monitorable { /** * Get the instance of {@link Metrics}. */ default void monitor(Metrics metrics) { return; }; } |
The following metrics would be added:
Metrics group name: "kafka.security.authorizer.metrics"
Metric "acls-total-count" for reporting total acls created.
Metric "authorization-request-rate-per-minute" for reporting total number of authorization requests per minute
Metric "authorization-allowed-rate-per-minute" for reporting total number of authorization allowed per minute
Metric "authorization-denied-rate-per-minute" for reporting total number of authorization denied per minute
Proposed Changes
org.apache.kafka.server.authorizer.Authorizer Metrics interface need be updated to extends extend Monitorable Interface.
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.authorizer.Authorizer | ... import org.apache.kafka.common.Monitorable; ... public interface Authorizer extends Configurable, Closeable, Monitorable { ... } |
The following metrics will be added:
Full Name | Type | Description |
---|---|---|
kafka.server:type=kafka.security.authorizer.metrics,name=acls-total-count | 32-bit gauge | Total acls created in the broker |
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-request-rate-per-minute | Rate per minute | Total number of authorization requests per minute |
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-allowed-rate-per-minute | Rate per minute | Total number of authorization allowed per minute |
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-denied-rate-per-minute | Rate per minute | Total number of authorization denied per minute |
Proposed Changes
kafka.server.KafkaServer will be updated to pass instance of Metrics to Authorizer.
Code Block | ||||
---|---|---|---|---|
| ||||
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
authorizer.foreach(_.monitor(metrics))
... |
kafka.security.authorizer.AclAuthorizer will be updated to collect and add Authorizer metrics.
Code Block | ||||
---|---|---|---|---|
| ||||
class AclAuthorizer extends Authorizer with Logging { ...... private var authorizerMetrics: AuthorizerMetrics = _ ...... override def monitor(metrics: Metrics): Unit = { authorizerMetrics = new AuthorizerMetrics(metrics) } ...... private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = { ...... logAuditMessage(requestContext, action, authorized) authorizerMetrics.recordAuthorizerMetrics(authorized) if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED } ... class AuthorizerMetrics(metrics: Metrics) { val GROUP_NAME = "kafka.security.authorizer.metrics" val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed") authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME, "The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied") authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME, "The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request") authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME, "The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size) def recordAuthorizerMetrics(authorized: Boolean): Unit = { if (authorized) { authorizationAllowedSensor.record() } else { authorizationDeniedSensor.record() } authorizationRequestSensor.record() } } } |
...