Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleJava Authorizer Interface
package org.apache.kafka.server.authorizer;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.acl.AclBindingEndpoint;
import org.apache.kafka.common.acl.AclBindingFilterAclBinding;
import org.apache.kafka.common.annotationacl.InterfaceStabilityAclBindingFilter;
import org.apache.kafka.common.KafkaRequestContext;
import org.apache.kafka.server.ServerInfo;
import org.apache.kafka.server.ServerInfo.Endpointannotation.InterfaceStability;

/**
 *
 * Pluggable authorizer interface for Kafka brokers.
 *
 * Startup sequence in brokers:
 * <ol>
 *   <li>Broker creates authorizer instance if configured in `authorizer.class.name`.</li>
 *   <li>Broker configures and starts authorizer instance. Authorizer implementation starts loading its metadata.</li>
 *   <li>Broker starts SocketServer to accept connections and process requests.</li>
 *   <li>For each listener, SocketServer waits for authorization metadata to be available in the
 *       authorizer before accepting connections. The future returned by {@link #start(ServerInfoAuthorizerServerInfo)}
 *       must return only when authorizer is ready to authorize requests on the listener.</li>
 *   <li>Broker accepts connections. For each connection, broker performs authentication and then accepts Kafka requests.
 *       For each request, broker invokes {@link #authorize(KafkaRequestContextAuthorizableRequestContext, List)} to authorize
 *       actions performed by the request.</li>
 * </ol>
 *
 * Authorizer implementation class may optionally implement @{@link org.apache.kafka.common.Reconfigurable}
 * to enable dynamic reconfiguration without restarting the broker.
 * <p>
 * <b>Thread safety:</b> All authorizer operations including authorization and ACL updates must be thread-safe.
 * </p>
 */
@InterfaceStability.Evolving
public interface Authorizer extends Configurable, Closeable {

    /**
     * Starts loading authorization metadata and returns futures that can be used to wait until
     * metadata for authorizing requests on each listener is available. Each listener will be
     * started only after its metadata is available and authorizer is ready to start authorizing
     * requests on that listener.
     *
     * @param serverInfo Metadata for the broker including broker id and listener endpoints
     * @return CompletableFutures for each endpoint that completes when authorizer is ready to
     *         start authorizing requests on that listener.
     */
    Map<Endpoint, CompletableFuture<Void>> start(ServerInfoAuthorizerServerInfo serverInfo);

    /**
     * Authorizes the specified action. Additional metadata for the action is specified
     * in `requestContext`.
     *
     * @param requestContext Request context including request type, security protocol and listener name
     * @param actions Actions being authorized including resource and operation for each action
     * @return List of authorization results for each action in the same order as the provided actions
     */
    List<AuthorizationResult> authorize(KafkaRequestContextAuthorizableRequestContext requestContext, List<Action> actions);

    /**
     * Creates new ACL bindings.
     *
     * @param requestContext Request context if the ACL is being created by a broker to handle
     *        a client request to create ACLs. This may be null if ACLs are created directly in ZooKeeper
     *        using AclCommand.
     * @param aclBindings ACL bindings to create
     *
     * @return Create result for each ACL binding in the same order as in the input list
     */
    List<AclCreateResult> createAcls(KafkaRequestContextAuthorizableRequestContext requestContext, List<AclBinding> aclBindings);

    /**
     * Deletes all ACL bindings that match the provided filters.
     *
     * @param requestContext Request context if the ACL is being deleted by a broker to handle
     *        a client request to delete ACLs. This may be null if ACLs are deleted directly in ZooKeeper
     *        using AclCommand.
     * @param aclBindingFilters Filters to match ACL bindings that are to be deleted
     *
     * @return Delete result for each filter in the same order as in the input list.
     *         Each result indicates which ACL bindings were actually deleted as well as any
     *         bindings that matched but could not be deleted.
     */
    List<AclDeleteResult> deleteAcls(KafkaRequestContextAuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters);

    /**
     * Returns ACL bindings which match the provided filter.
     *
     * @return Iterator for ACL bindings, which may be populated lazily.
     */
    Iterable<AclBinding> acls(AclBindingFilter filter);
}


Request context will be provided to authorizers using a new interface KafkaRequestContext AuthorizableRequestContext. The existing class org.apache.kafka.common.requests.RequestContext will implement this interface. New methods may be added to this interface in future, so mock implementations using this interface should adapt to these changes.

Code Block
languagejava
titleRequest Context
package org.apache.kafka.server.commonauthorizer;

import java.net.InetAddress;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Request context interface that provides data from request header as well as connection
 * and authentication information to plugins.
 */
@InterfaceStability.Evolving
public interface KafkaRequestContextAuthorizableRequestContext {

    /**
     * Returns name of listener on which request was received.
     */
    String listener();

    /**
     * Returns the security protocol for the listener on which request was received.
     */
    SecurityProtocol securityProtocol();

    /**
     * Returns authenticated principal for the connection on which request was received.
     */
    KafkaPrincipal principal();

    /**
     * Returns client IP address from which request was sent.
     */
    InetAddress clientAddress();

    /**
     * 16-bit API key of the request from the request header. See
     * https://kafka.apache.org/protocol#protocol_api_keys for request types.
     */
    int requestType();

    /**
     * Returns a name for the the request type. For fetch requests, version from the metrics names
     * FetchFollower and FetchConsumer will be used to distinguish between
     * replica fetch requests and client fetch requestsrequest header.
     */
    Stringint requestNamerequestVersion();

    /**
     * Returns the requestclient versionid from the request header.
     */
    intString requestVersionclientId();

    /**
     * Returns the clientcorrelation id from the request header.
     */
    Stringint clientIdcorrelationId();

    /**
     * Returns the correlation id from the request header.
     */
    int correlationId();
}

}


AuthorizerServerInfo ServerInfo provides runtime broker configuration to authorization plugins including broker id, cluster id and endpoint information. New methods may be added to this interface in future, so mock implementations using this interface should adapt to these changes.

Code Block
languagejava
titleBroker Runtime Config
package org.apache.kafka.server.authorizer;

import java.util.Collection;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.annotation.InterfaceStabilityEndpoint;
import org.apache.kafka.common.securityannotation.auth.SecurityProtocolInterfaceStability;

/**
 * Runtime broker configuration metadata for broker-side pluginsprovided to authorizers during start up.
 */
@InterfaceStability.Evolving
public interface ServerInfoAuthorizerServerInfo {

    interface Endpoint {

/**
     * Returns cluster String listener();

        SecurityProtocol securityProtocol();

metadata for the broker running this authorizer including cluster id.
     */
    StringClusterResource hostclusterResource();

    /**
    int port();
* Returns broker id. }

This may be  ClusterResource clusterResource();
a generated broker id if `broker.id` was not configured.
     */
    int brokerId();

    /**
    Collection<Endpoint> endpoints();
 * Returns endpoints for all listeners including the advertised host and port to which
     * the listener is bound.
    Endpoint interBrokerEndpoint(); */
    Collection<Endpoint> endpoints();

    /**
     * Returns the inter-broker endpoint. This is one of the endpoints returned by {@link #endpoints()}.
     */
    Endpoint interBrokerEndpoint();
}


Endpoint is added as a common class so that it may be reused in several places in the code where we use this abstraction.

Code Block
languagejava
titleEndpoint
/**
 * Represents a broker endpoint.
 */
public class Endpoint {

    private final String listener;
    private final SecurityProtocol securityProtocol;
    private final String host;
    private final int port;

    public Endpoint(String listener, SecurityProtocol securityProtocol, String host, int port) {
        this.listener = listener;
        this.securityProtocol = securityProtocol;
        this.host = host;
        this.port = port;
    }

    /**
     * Returns the listener name of this endpoint.
     */
    public String listener() {
        return listener;
    }

    /**
     * Returns the security protocol of this endpoint.
     */
    public SecurityProtocol securityProtocol() {
        return securityProtocol;
    }

    /**
     * Returns advertised host name of this endpoint.
     */
    public String host() {
        return host;
    }

    /**
     * Returns the port to which the listener is bound.
     */
    public int port() {
        return port;
    }
}



Action  provides details of the action being authorized including resource and operation. Additional context including audit flag indicating authorization usage are also included, enabling access violation to be distinguished from resource filtering or optional ACLs.

...