THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.common; import org.apache.kafka.common.metrics.Metrics; /** * Interface for plugins to get Metrics instance */ public interface Monitorable { /** * Get the instance of {@link Metrics}. */ default void monitor(Metrics metrics) { return; }; } |
The org.apache.kafka.server.authorizer.Authorizer
...
interface need be updated to
...
extend Monitorable Interface.
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.authorizer.Authorizer |
...
... import org.apache.kafka.common.Monitorable; |
...
... public interface Authorizer extends Configurable, Closeable, Monitorable { ... } |
The following metrics would will be added:Metrics group name: "
Full Name | Type | Description |
---|---|---|
kafka.server:type=kafka.security.authorizer.metrics |
...
,name=acls-total-count |
...
32-bit gauge | Total acls created in the broker | |
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-request-rate-per- |
...
minute | Rate per minute | Total number of authorization requests per minute |
...
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-allowed-rate-per- |
...
minute | Rate per minute | Total number of authorization allowed per minute |
...
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-denied-rate-per- |
...
minute | Rate per minute | Total number of authorization denied |
...
per minute |
Proposed Changes
kafka.server.KafkaServer will be updated to pass instance of Metrics to Authorizer.
Code Block | ||||
---|---|---|---|---|
| ||||
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
authorizer.foreach(_.monitor(metrics))
... |
kafka.security.authorizer.AclAuthorizer will be updated to collect Authorizer metrics.
Code Block | ||||
---|---|---|---|---|
| ||||
class AclAuthorizer extends Authorizer with Logging { ...... private var authorizerMetrics: AuthorizerMetrics = _ ...... override def monitor(metrics: Metrics): Unit = { authorizerMetrics = new AuthorizerMetrics(metrics) } ...... private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = { ...... logAuditMessage(requestContext, action, authorized) authorizerMetrics.recordAuthorizerMetrics(authorized) if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED } ... class AuthorizerMetrics(metrics: Metrics) { val GROUP_NAME = "kafka.security.authorizer.metrics" val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed") authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME, "The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied") authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME, "The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request") authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME, "The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size) def recordAuthorizerMetrics(authorized: Boolean): Unit = { if (authorized) { authorizationAllowedSensor.record() } else { authorizationDeniedSensor.record() } authorizationRequestSensor.record() } } } |
...