Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

kafka.security.authorizer.AclAuthorizer will be updated for adding following Kafka metrics
Metrics group name: "kafka.security.authorizer.metrics"
Metric "aclacls-total-count" for reporting total acls created.
Metric "authorization-request-rate-per-hourminute" for reporting total number of authorization requests per hourminute
Metric "authorization-allowed-rate-per-hourminute" for reporting total number of authorization allowed per hourper minute
Metric "authorization-denied-rate-per-hourminute" for reporting total number of authorization denied per hour    per minute    

Code Block
languagescala
titlekafka.security.authorizer.AclAuthorizer
class AclAuthorizer extends Authorizer with Logging {
......
	private var metricsGroup: MetricsGroup = _
......
	override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = {
    	metricsGroup = new MetricsGroup(serverInfo.metrics())
    	serverInfo.endpoints.asScala.map { endpoint =>
      	endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava
	}
......

	private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
		......
		// Record metrics
    	if (authorized) {
      		metricsGroup.recordAuthorizationAllowed()
    	} else {
      		metricsGroup.recordAuthorizationDenied()
    	}
   	 	metricsGroup.recordAuthorizationRequest()
		logAuditMessage(requestContext, action, authorized)
    	if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
	}

	class MetricsGroupAuthorizerMetrics(metrics: Metrics) {
    	val GROUP_NAME = "acl-authorizer-kafka.security.authorizer.metrics"
    	val authorizationAllowedSensor = metrics.sensor("aclauthorizer-authorization-allowed")
    	authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-hourminute", GROUP_NAME,
      		"The number of authoization allowed per hour"), new Rate(TimeUnit.HOURSMINUTES, new WindowedCount()))

    	val authorizationDeniedSensor = metrics.sensor("aclauthorizer-authorization-denied")
    	authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-hourminute", GROUP_NAME,
      		"The number of authoization denied per hour"), new Rate(TimeUnit.HOURSMINUTES, new WindowedCount()))

    	val authorizationRequestSensor = metrics.sensor("aclauthorizer-authorization-request")
    	authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-hourminute", GROUP_NAME,
      		"The number of authoization request per hour"), new Rate(TimeUnit.HOURSMINUTES, new WindowedCount()))

    	metrics.addMetric(metrics.metricName("aclacls-total-count", GROUP_NAME, "The number of acls defined"),
      	(config, now) => aclCache.size)

    	def recordAuthorizationAllowed(): Unit = {
      		authorizationAllowedSensor.record()
    	}

    	def recordAuthorizationDenied(): Unit = {
      		authorizationDeniedSensor.record()
    	}

    	def recordAuthorizationRequest(): Unit = {
      		authorizationRequestSensor.record()
    	}
  	}
}

Compatibility, Deprecation, and Migration Plan

Broker will start Authorizer plugins with AuthorizerServerInfo containing the new method 'Metrics metrics()' which has default implementation. Old version of Authorizer plugins will not call  the new method, so it is backward to old version of Authorizer plugins.    

Test Plan

Test total number of acls created match with the number from metrics

Test authorization request rate per minute

Test authorization allowed rate per minute

Test authorization denied rate per minute 

Rejected Alternatives

Authorizer plugins create and configure own Metrics and JmxReport. Using this alternative approach make collecting Kafka metrics in Authorizer plugin separate from broker metrics that cause disconnection. Users need implement and manage own Metrics and JmsReport with extra effort. 

...