You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Status

Current state[Under Discussion]

Discussion thread: here 

Vote thread: here

JIRA Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Current authorizer plugin cannot access to Kafka broker metrics information as plugin.  Kafka authorizer plugins need access to runtime broker Metrics instance.  There is disconnection for how to manage Metrics between broker and authorizer plugins in current Kafka implementation. The authorizer plugins as plugin of broker could use same Metrics instance as broker, so authorization plugin need not manage tasks like creating and configuring Metrics and JmxReporter. With the feature of this KIP, Authorizer plugin can use Metrics to manage and create Kafka metrics very easy.


Public Interfaces

AuthorizerServerInfo interface provides runtime broker configuration to authorization plugins including cluster resource, broker id, cluster id and endpoint information. A new method 'Metrics metrics()' for accessing to broker Metrics information in broker is added to this interface.

Broker Runtime Config
package org.apache.kafka.server.authorizer;

import java.util.Collection;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.metrics.Metrics;

/**
 * Runtime broker configuration metadata provided to authorizers during start up.
 */
@InterfaceStability.Evolving
public interface AuthorizerServerInfo {

    /**
     * Returns cluster metadata for the broker running this authorizer including cluster id.
     */
    ClusterResource clusterResource();

    /**
     * Returns broker id. This may be a generated broker id if `broker.id` was not configured.
     */
    int brokerId();

    /**Metrcis
     * Returns endpoints for all listeners including the advertised host and port to which
     * the listener is bound.
     */
    Collection<Endpoint> endpoints();

    /**
     * Returns the inter-broker endpoint. This is one of the endpoints returned by {@link #endpoints()}.
     */
    Endpoint interBrokerEndpoint();

    // Confluent extensions to Authorizer API

    /**
     * Returns the instance of {@link Metrics} in this broker.
     */
    default Metrics metrics() {
        return null;
    }
}


Proposed Changes

Both object and class of Broker in kafka.cluster.Broker.scala will be updated to accept a new parameter referring to Kafka Metrics. 

kafka.cluster.Broker
object Broker {
  private[cluster] case class ServerInfo(clusterResource: ClusterResource,
                                         brokerId: Int,
                                         endpoints: util.List[Endpoint],
                                         interBrokerEndpoint: Endpoint,
										 override val metrics: Metrics) extends AuthorizerServerInfo
}

case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String]) {
  ......
  def toServerInfo(clusterId: String, config: KafkaConfig, metrics: Metrics): AuthorizerServerInfo = {
    val clusterResource: ClusterResource = new ClusterResource(clusterId)
    val interBrokerEndpoint: Endpoint = endPoint(config.interBrokerListenerName).toJava
    val brokerEndpoints: util.List[Endpoint] = endPoints.toList.map(_.toJava).asJava
    Broker.ServerInfo(clusterResource, id, brokerEndpoints, interBrokerEndpoint, metrics)
  }
}


kafka.server.KafkaServer will be updated to pass instance of Metrics to start method in Authorizer Object.

kafka.server.KafkaServer
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
	case Some(authZ) => 
		authZ.start(brokerInfo.broker.toServerInfo(clusterId, config, metrics)).asScala.map { case (ep, cs) =>
    		ep -> cs.toCompletableFuture
    	}
    case None =>
    	brokerInfo.broker.endPoints.map { ep =>
        	ep.toJava -> CompletableFuture.completedFuture[Void](null)
        }.toMap
}


kafka.security.authorizer.AclAuthorizer will be updated for adding following Kafka metrics
Metrics group name: "kafka.security.authorizer.metrics"
Metric "acls-total-count" for reporting total acls created.
Metric "authorization-request-rate-per-minute" for reporting total number of authorization requests per minute
Metric "authorization-allowed-rate-per-minute" for reporting total number of authorization allowed per minute
Metric "authorization-denied-rate-per-minute" for reporting total number of authorization denied per minute    

kafka.security.authorizer.AclAuthorizer
class AclAuthorizer extends Authorizer with Logging {
......
	private var metricsGroup: MetricsGroup = _
......
	override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = {
    	metricsGroup = new MetricsGroup(serverInfo.metrics())
    	serverInfo.endpoints.asScala.map { endpoint =>
      	endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava
	}
......

	private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
		......
		// Record metrics
    	if (authorized) {
      		metricsGroup.recordAuthorizationAllowed()
    	} else {
      		metricsGroup.recordAuthorizationDenied()
    	}
   	 	metricsGroup.recordAuthorizationRequest()
		logAuditMessage(requestContext, action, authorized)
    	if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
	}

	class AuthorizerMetrics(metrics: Metrics) {
    	val GROUP_NAME = "kafka.security.authorizer.metrics"
    	val authorizationAllowedSensor = metrics.sensor("authorizer-authorization-allowed")
    	authorizationAllowedSensor.add(metrics.metricName("authorization-allowed-rate-per-minute", GROUP_NAME,
      		"The number of authoization allowed per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationDeniedSensor = metrics.sensor("authorizer-authorization-denied")
    	authorizationDeniedSensor.add(metrics.metricName("authorization-denied-rate-per-minute", GROUP_NAME,
      		"The number of authoization denied per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	val authorizationRequestSensor = metrics.sensor("authorizer-authorization-request")
    	authorizationRequestSensor.add(metrics.metricName("authorization-request-rate-per-minute", GROUP_NAME,
      		"The number of authoization request per hour"), new Rate(TimeUnit.MINUTES, new WindowedCount()))

    	metrics.addMetric(metrics.metricName("acls-total-count", GROUP_NAME, "The number of acls defined"), (config, now) => aclCache.size)

    	def recordAuthorizationAllowed(): Unit = {
      		authorizationAllowedSensor.record()
    	}

    	def recordAuthorizationDenied(): Unit = {
      	authorizationDeniedSensor.record()
    	}

    	def recordAuthorizationRequest(): Unit = {
      		authorizationRequestSensor.record()
    	}
  	}
}

Compatibility, Deprecation, and Migration Plan

Broker will start Authorizer plugins with AuthorizerServerInfo containing the new method 'Metrics metrics()' which has default implementation. Old version of Authorizer plugins will not call  the new method, so it is backward to old version of Authorizer plugins.    

Test Plan

Test total number of acls created match with the number from metrics

Test authorization request rate per minute

Test authorization allowed rate per minute

Test authorization denied rate per minute 

Rejected Alternatives

Authorizer plugins create and configure own Metrics and JmxReport. Using this alternative approach make collecting Kafka metrics in Authorizer plugin separate from broker metrics that cause disconnection. Users need implement and manage own Metrics and JmsReport with extra effort. 






  • No labels