Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state[Under Discussion]Accepted

Discussion thread: here 

Vote thread: here 

JIRA: here 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9958

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Current Kafka authorizer plugin cannot access to Kafka broker metrics information as pluginKafka authorizer plugins Kafka authorizer plugins need access to runtime broker Metrics instance to add additional metrics.  There is disconnection for between how to manage Metrics between broker and authorizer plugins in current Kafka implementation. The authorizer The authorizer plugins as plugins as plugin of broker could use same Metrics instance as in broker, so authorization plugin so authorizer plugins need not manage tasks like creating and configuring Metrics and JmxReporter. With the feature of this KIP, Authorizer  authorizer plugin can use broker's Metrics to manage and create Kafka instance to add additional metrics very easy.

Public Interfaces

AuthorizerServerInfo interface provides runtime broker configuration to authorization plugins including cluster resource, broker id, cluster id and endpoint information. A new method 'Metrics metrics()' for accessing to broker Metrics information in broker is added to this interfaceDefine a new interface Monitorable in package org.apache.kafka.common. The authorizer plugins can implement the interface and broker can pass the instance of Metrics to Authorizer. Other broker'd or client's plugins could potentially implement the interface in the future and get the broker's or client's Metrics instance to add additional metrics from sub-components. Support for other plugins will be addressed later in another KIP if necessary. This KIP will focus on exposing Kafka Metrics in Authorizer only.

Code Block
languagejava
titleBroker Runtime ConfigMonitorable Interface
package org.apache.kafka.server.authorizer;

import java.util.Collection;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.metrics.Metrics;

/**
 * RuntimeInterface brokerfor configuration metadataplugins provided to authorizersget during start up.Metrics instance
 */
@InterfaceStability.Evolving
public interface AuthorizerServerInfoMonitorable {

    /**
     * Returns cluster metadata forGet the brokerinstance runningof this authorizer including cluster id{@link Metrics}.
     */
    default ClusterResourcevoid clusterResourcemonitor(Metrics metrics) {
		return;

    /**
     * Returns broker id. This may be a generated broker id if `broker.id` was not configured.
     */
    int brokerId();

    /**Metrcis
     * Returns endpoints for all listeners including the advertised host and port to which
     * the listener is bound.
     */
    Collection<Endpoint> endpoints();

    /**
     * Returns the inter-broker endpoint. This is one of the endpoints returned by {@link #endpoints()}.
     */
    Endpoint interBrokerEndpoint();

    // Confluent extensions to Authorizer API

    /**
     * Returns the instance of {@link Metrics} in this broker.
     */
    default Metrics metrics() {
        return null;
    }
}

Proposed Changes

	};
}


The org.apache.kafka.server.authorizer.Authorizer interface need be updated to extend Monitorable Interface.

Code Block
languagejava
titleAuthorizer Interface
package org.apache.kafka.server.authorizer.Authorizer
... 
import org.apache.kafka.common.Monitorable;
...
public interface Authorizer extends Configurable, Closeable, Monitorable {
...
}


The following metrics will be added:

Full NameTypeDescription
kafka.server:type=kafka.security.authorizer.metrics,name=acls-total-count32-bit gaugeTotal acls created in the broker
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-request-rate-per-minuteRate per minuteTotal number of authorization requests per minute
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-allowed-rate-per-minuteRate per minuteTotal number of authorization allowed per minute
kafka.server:type=kafka.security.authorizer.metrics,name=authorization-denied-rate-per-minuteRate per minuteTotal number of authorization denied per minute 

Proposed Changes

kafka.server.KafkaServer will be updated to pass instance of Metrics to Authorizer.

Code Block
languagescala
titlekafka.server.KafkaServer
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
authorizer.foreach(_.monitor(metrics))
...


kafka.security.authorizer.AclAuthorizer will be updated to collect and add Authorizer metrics.Both object and class of Broker in kafka.cluster.Broker.scala will be updated to accept a new parameter referring to Kafka Metrics. 

Code Block
languagescala
titlekafka.security.clusterauthorizer.BrokerAclAuthorizer
objectclass BrokerAclAuthorizer {
extends Authorizer private[cluster] case class ServerInfo(clusterResource: ClusterResource,
                                         brokerId: Int,
                                         endpoints: util.List[Endpoint]with Logging {
......
	private var authorizerMetrics: AuthorizerMetrics = _
......
	override def monitor(metrics: Metrics): Unit = {
    	authorizerMetrics = new AuthorizerMetrics(metrics)
  	}
......

	private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
		......
		logAuditMessage(requestContext, action, authorized)
		authorizerMetrics.recordAuthorizerMetrics(authorized)
    	if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
	}
...

	class AuthorizerMetrics(metrics: Metrics) {
    	val GROUP_NAME = "kafka.security.authorizer.metrics"
    	val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed")
    	authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME,
      		"The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied")
    	authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME,
      		"The number of authoization denied per  interBrokerEndpoint: Endpoint,
										 metrics: Metrics) extends AuthorizerServerInfo
}

case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String]) {
  ......
  def toServerInfo(clusterId: String, config: KafkaConfig, metrics: Metrics): AuthorizerServerInfo = {
    val clusterResource: ClusterResource = new ClusterResource(clusterId)
    val interBrokerEndpoint: Endpoint = endPoint(config.interBrokerListenerName).toJava
    val brokerEndpoints: util.List[Endpoint] = endPoints.toList.map(_.toJava).asJava
    Broker.ServerInfo(clusterResource, id, brokerEndpoints, interBrokerEndpoint, metrics)
  }
}

kafka.server.KafkaServer will be updated to pass instance of Metrics to start method in Authorizer Object.

Code Block
languagescala
titlekafka.server.KafkaServer
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
	case Some(authZ) => 
		authZ.start(brokerInfo.broker.toServerInfo(clusterId, config, metrics)).asScala.map { case (ep, cs) =>
    		ep -> cs.toCompletableFuture
    	}
    case None =>
    	brokerInfo.broker.endPoints.map { ep =>
        	ep.toJava -> CompletableFuture.completedFuture[Void](null)
        }.toMap
}

...

hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request")
    	authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME,
      		"The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size)

    	def recordAuthorizerMetrics(authorized: Boolean): Unit = {
      		if (authorized) {
        		authorizationAllowedSensor.record()
      		} else {
        		authorizationDeniedSensor.record()
      		}
      		authorizationRequestSensor.record()
    	}
  	}
}

Compatibility, Deprecation, and Migration Plan

Broker will start Authorizer plugins with AuthorizerServerInfo containing the new method 'Metrics metrics()' call authorizer's monitor(metrics: Metrics) which has default implementation after authorizer configured. Old version of Authorizer plugins will not call  the new authorizer plugins doesn't call the monitor method, so it is backward to old version of Authorizer authorizer plugins.    

Test Plan

Test total number of acls created match with the number from metrics

Test authorization request rate per minute

Test authorization allowed rate per minute

Test authorization denied rate per minute 

Rejected Alternatives

Authorizer plugins create and configure own Metrics and JmxReport. Using this alternative approach make collecting Kafka metrics in Authorizer plugin separate from broker metrics that cause disconnection. Users need implement and manage own Metrics and JmsReport with extra effort

Pass the instance of Metrics through AuthorizerServerInfo interface. The instance of Metrics should not be part of Authorizer server information. We use an interface to expose Kafka Metrics so that other broker's or client's plugins could potentially implement the interface