You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state[Under Discussion]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka authorization plugins need access to runtime broker Metrics instance. There is disconnection for how to manage Metrics between broker and authorization plugins in current Kafka implementation. The authorization plugins as plugin of broker could use same Metrics instance as broker, so authorization plugin need not manage tasks like creating and configuring Metrics and JmxReporter


Public Interfaces

AuthorizerServerInfo interface provides runtime broker configuration to authorization plugins including cluster resource, broker id, cluster id and endpoint information. A new method 'Metrics metrics()' for accessing broker Metrics information in broker is added to this interface.

Broker Runtime Config
package org.apache.kafka.server.authorizer;

import java.util.Collection;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.metrics.Metrics;

/**
 * Runtime broker configuration metadata provided to authorizers during start up.
 */
@InterfaceStability.Evolving
public interface AuthorizerServerInfo {

    /**
     * Returns cluster metadata for the broker running this authorizer including cluster id.
     */
    ClusterResource clusterResource();

    /**
     * Returns broker id. This may be a generated broker id if `broker.id` was not configured.
     */
    int brokerId();

    /**Metrcis
     * Returns endpoints for all listeners including the advertised host and port to which
     * the listener is bound.
     */
    Collection<Endpoint> endpoints();

    /**
     * Returns the inter-broker endpoint. This is one of the endpoints returned by {@link #endpoints()}.
     */
    Endpoint interBrokerEndpoint();

    // Confluent extensions to Authorizer API

    /**
     * Returns the instance of {@link Metrics} in this broker.
     */
    default Metrics metrics() {
        return null;
    }
}


Proposed Changes

Both object and class of Broker in kafka.cluster.Broker.scala will be updated to accept a new parameter referring to Metrics. 

Broker.scala
object Broker {
  private[cluster] case class ServerInfo(clusterResource: ClusterResource,
                                         brokerId: Int,
                                         endpoints: util.List[Endpoint],
                                         interBrokerEndpoint: Endpoint,
										 metrics: Metrics) extends AuthorizerServerInfo
}

case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String]) {
  ......
  def toServerInfo(clusterId: String, config: KafkaConfig, metrics: Metrics): AuthorizerServerInfo = {
    val clusterResource: ClusterResource = new ClusterResource(clusterId)
    val interBrokerEndpoint: Endpoint = endPoint(config.interBrokerListenerName).toJava
    val brokerEndpoints: util.List[Endpoint] = endPoints.toList.map(_.toJava).asJava
    Broker.ServerInfo(clusterResource, id, brokerEndpoints, interBrokerEndpoint, metrics)
  }
}


kafka.server.KafkaServer will be updated to pass instance of Metrics to start method in Authorizer Object.

KafkaServer
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
	case Some(authZ) => 
		authZ.start(brokerInfo.broker.toServerInfo(clusterId, config, metrics)).asScala.map { case (ep, cs) =>
    		ep -> cs.toCompletableFuture
    	}
    case None =>
    	brokerInfo.broker.endPoints.map { ep =>
        	ep.toJava -> CompletableFuture.completedFuture[Void](null)
        }.toMap
}

    

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels