Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
titlekafka.server.KafkaServer
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
	case Some(authZ) => 
		authZ.start(brokerInfo.broker.toServerInfo(clusterId, config, metrics)).asScala.map { case (ep, cs) =>
    		ep -> cs.toCompletableFuture
    	}
    case None =>
    	brokerInfo.broker.endPoints.map { ep =>
        	ep.toJava -> CompletableFuture.completedFuture[Void](null)
        }.toMap
}

    


kafka.security.authorizer.AclAuthorizer

...

will

...

be

...

updated

...

for

...

adding

...

following

...

Kafka

...

metrics
Metric

...

"acl-total-count"

...

for

...

reporting

...

total

...

acls

...

created.
Metric

...

"authorization-request-rate-per-hour"

...

for

...

reporting

...

total

...

number

...

of

...

authorization

...

requests per hour
Metric "authorization-allowed-rate-per-hour" for reporting total number of authorization allowed per hour
Metric "authorization-denied-rate-per-hour" for reporting total number of authorization denied per hour    

Code Block
languagescala
titlekafka.security.authorizer.AclAuthorizer
class AclAuthorizer extends Authorizer with Logging {
......
	private var metricsGroup: MetricsGroup = _
......
	override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = {
    	metricsGroup = new MetricsGroup(serverInfo.metrics())
    	serverInfo.endpoints.asScala.map { endpoint =>
      	endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava
	}
......

	private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
		......
		// Record metrics
    	if (authorized) {
      		metricsGroup.recordAuthorizationAllowed()
    	} else {
      		metricsGroup.recordAuthorizationDenied()
    	}
   	 	metricsGroup.recordAuthorizationRequest()
		logAuditMessage(requestContext, action, authorized)
    	if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
	}

	class MetricsGroup(metrics: Metrics) {
    	val GROUP_NAME = "acl-authorizer-metrics"
    	val authorizationAllowedSensor = metrics.sensor("acl-authorization-allowed")
    	authorizationAllowedSensor.add(metrics.metricName("authorization-allowed

...

-rate-per-hour"

...

, GROUP_NAME,
      	"The number of 

...

authoization allowed per hour

...

"), new Rate(TimeUnit.HOURS, new WindowedCount()))

    	val authorizationDeniedSensor = metrics.sensor("acl-authorization-denied")
    	authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-hour"

...

, GROUP_NAME,
      	"The number of authoization denied per hour"), new Rate(TimeUnit.HOURS, new WindowedCount()))

    	val authorizationRequestSensor = metrics.sensor("acl-authorization-request")
    	authorizationRequestSensor.add(metrics.metricName("authorization

...

-request-rate-per-hour", GROUP_NAME,
      	"The number of authoization request per hour"), new Rate(TimeUnit.HOURS, new WindowedCount()))

    	metrics.addMetric(metrics.metricName("acl-total-count", GROUP_NAME, "The number of acls defined"),
      	(config, now) => aclCache.size)

    	def recordAuthorizationAllowed(): Unit = {
      		authorizationAllowedSensor.record()
    	}

    	def recordAuthorizationDenied(): Unit = {
      		authorizationDeniedSensor.record()
    	}

    	def recordAuthorizationRequest(): Unit = {
      	authorizationRequestSensor.record()
    	}
	}
}

Compatibility, Deprecation, and Migration Plan

...