...
Code Block | ||||
---|---|---|---|---|
| ||||
/* Get the authorizer and initialize it if one is specified.*/ authorizer = config.authorizer authorizer.foreach(_.configure(config.originals)) val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match { case Some(authZ) => authZ.start(brokerInfo.broker.toServerInfo(clusterId, config, metrics)).asScala.map { case (ep, cs) => ep -> cs.toCompletableFuture } case None => brokerInfo.broker.endPoints.map { ep => ep.toJava -> CompletableFuture.completedFuture[Void](null) }.toMap } |
kafka.security.authorizer.AclAuthorizer
...
will
...
be
...
updated
...
for
...
adding
...
following
...
Kafka
...
metrics
Metric
...
"acl-total-count"
...
for
...
reporting
...
total
...
acls
...
created.
Metric
...
"authorization-request-rate-per-hour"
...
for
...
reporting
...
total
...
number
...
of
...
authorization
...
requests per hour
Metric "authorization-allowed-rate-per-hour" for reporting total number of authorization allowed per hour
Metric "authorization-denied-rate-per-hour" for reporting total number of authorization denied per hour
Code Block | ||||
---|---|---|---|---|
| ||||
class AclAuthorizer extends Authorizer with Logging {
......
private var metricsGroup: MetricsGroup = _
......
override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = {
metricsGroup = new MetricsGroup(serverInfo.metrics())
serverInfo.endpoints.asScala.map { endpoint =>
endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava
}
......
private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
......
// Record metrics
if (authorized) {
metricsGroup.recordAuthorizationAllowed()
} else {
metricsGroup.recordAuthorizationDenied()
}
metricsGroup.recordAuthorizationRequest()
logAuditMessage(requestContext, action, authorized)
if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
}
class MetricsGroup(metrics: Metrics) {
val GROUP_NAME = "acl-authorizer-metrics"
val authorizationAllowedSensor = metrics.sensor("acl-authorization-allowed")
authorizationAllowedSensor.add(metrics.metricName("authorization-allowed |
...
-rate-per-hour" |
...
, GROUP_NAME, "The number of |
...
authoization allowed per hour |
...
"), new Rate(TimeUnit.HOURS, new WindowedCount())) val authorizationDeniedSensor = metrics.sensor("acl-authorization-denied") authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-hour" |
...
, GROUP_NAME, "The number of authoization denied per hour"), new Rate(TimeUnit.HOURS, new WindowedCount())) val authorizationRequestSensor = metrics.sensor("acl-authorization-request") authorizationRequestSensor.add(metrics.metricName("authorization |
...
-request-rate-per-hour", GROUP_NAME,
"The number of authoization request per hour"), new Rate(TimeUnit.HOURS, new WindowedCount()))
metrics.addMetric(metrics.metricName("acl-total-count", GROUP_NAME, "The number of acls defined"),
(config, now) => aclCache.size)
def recordAuthorizationAllowed(): Unit = {
authorizationAllowedSensor.record()
}
def recordAuthorizationDenied(): Unit = {
authorizationDeniedSensor.record()
}
def recordAuthorizationRequest(): Unit = {
authorizationRequestSensor.record()
}
}
} |
Compatibility, Deprecation, and Migration Plan
...