Table of Contents |
---|
Status
Current state: [Under Discussion]Accepted
Discussion thread: here
Vote thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
Define a new interface Monitorable in package org.apache.kafka.common. The authorizer plugins can implement the interface and broker can pass the instance of Metrics to Authorizer. Other Other broker'd or client's plugins could potentially implement the interface in the future and get the broker's or client's Metrics instance to add additional metrics from sub-components. Support for other plugins will be addressed later in another KIP if necessary. This KIP will focus on exposing Kafka Metrics in Authorizer only.
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.common; import org.apache.kafka.common.metrics.Metrics; /** * Interface for plugins to get Metrics instance */ public interface Monitorable { /** * Get the instance of {@link Metrics}. */ default void monitor(Metrics metrics) { return; }; } |
Proposed Changes
The org.apache.kafka.server.authorizer.Authorizer Metrics interface need be updated to extends extend Monitorable Interface.
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.authorizer.Authorizer | ... import org.apache.kafka.common.Monitorable; ... public interface Authorizer extends Configurable, Closeable, Monitorable { ... } |
The following metrics will be added:
Full Name | Type | Description |
---|---|---|
kafka.server:type=kafka.security.authorizer.metrics,name=acls-total-count | 32-bit gauge | Total acls created in the broker |
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-request-rate-per-minute | Rate per minute | Total number of authorization requests per minute |
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-allowed-rate-per-minute | Rate per minute | Total number of authorization allowed per minute |
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-denied-rate-per-minute | Rate per minute | Total number of authorization denied per minute |
Proposed Changes
kafka.server.KafkaServer will be updated to pass instance of Metrics to Authorizer.
...
kafka.security.authorizer.AclAuthorizer will be updated for adding following Kafka metrics
Metrics group name: "kafka.security.authorizer.metrics"
Metric "acls-total-count" for reporting total acls created.
Metric "authorization-request-rate-per-minute" for reporting total number of authorization requests per minute
Metric "authorization-allowed-rate-per-minute" for reporting total number of authorization allowed per minute
Metric "authorization-denied-rate-per-minute" for reporting total number of authorization denied per minute to collect and add Authorizer metrics.
Code Block | ||||
---|---|---|---|---|
| ||||
class AclAuthorizer extends Authorizer with Logging { ...... private var authorizerMetrics: AuthorizerMetrics = _ ...... override def monitor(metrics: Metrics): Unit = { authorizerMetrics = new AuthorizerMetrics(metrics) } ...... private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = { ...... // Record authorization requests authorizerMetrics.recordAuthorizationRequest() logAuditMessage(requestContext, action, authorized) if authorizerMetrics.recordAuthorizerMetrics(authorized) { authorizerMetrics.recordAuthorizationAllowed() AuthorizationResult.ALLOWED } else { authorizerMetrics.recordAuthorizationDenied() if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED } }... class AuthorizerMetrics(metrics: Metrics) { val GROUP_NAME = "kafka.security.authorizer.metrics" val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed") authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME, "The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied") authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME, "The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request") authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME, "The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount())) metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size) def recordAuthorizationAllowedrecordAuthorizerMetrics(authorized: Boolean): Unit = { authorizationAllowedSensor.recordif (authorized) { } authorizationAllowedSensor.record() def recordAuthorizationDenied(): Unit = } else { authorizationDeniedSensor.record() } def recordAuthorizationRequest(): Unit = { } authorizationRequestSensor.record() } } } |
...
Authorizer plugins create and configure own Metrics and JmxReport. Using this alternative approach make collecting Kafka metrics in Authorizer plugin separate from broker metrics that cause disconnection. Users need implement and manage own Metrics and JmsReport with extra effort.
Pass the instance of Metrics through AuthorizerServerInfo interface. The instance of Metrics should not be part of Authorizer server information. We use an interface to expose Kafka Metrics so that other broker's or client's plugins could potentially implement the interface.