Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleJava Authorizer Interface
package org.apache.kafka.server.authorizer;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.ClusterResourceConfigurable;
import org.apache.kafka.common.acl.ConfigurableAclBinding;
import org.apache.kafka.common.acl.AclBindingAclBindingFilter;
import org.apache.kafka.common.aclannotation.AclBindingFilterInterfaceStability;
import org.apache.kafka.common.annotation.InterfaceStabilityKafkaRequestContext;
import org.apache.kafka.common.security.auth.SecurityProtocolserver.BrokerInfo;
import org.apache.kafka.server.commonBrokerInfo.KafkaRequestContextEndpoint;

/**
 *
 * Pluggable authorizer interface for Kafka brokers.
 *
 * Startup sequence in brokers:
 * <ol>
 *   <li>Broker creates authorizer instance if configured in `authorizer.class.name`.</li>
 *   <li>Broker configures and starts authorizer instance. Authorizer implementation starts loading its metadata.</li>
 *   <li>Broker starts SocketServer to accept connections and process requests.</li>
 *   <li>For each listener, SocketServer waits for authorization metadata to be available in the
 *       authorizer before accepting connections. The future returned by {@link #start(ClusterResource, Map)} BrokerInfo)}
 *       must return only when authorizer is ready to authorize requests on the listener.</li>
 *   <li>Broker accepts connections. For each connection, broker performs authentication and then accepts Kafka requests.
 *       For each request, broker invokes {@link #authorize(KafkaRequestContext, List)} to authorize
 *       actions performed by the request.</li>
 * </ol>
 *
 * Authorizer implementation class may optionally implement @{@link org.apache.kafka.common.Reconfigurable}
 * to enable dynamic reconfiguration without restarting the broker.
 * <p>
 * <b>Thread safety:</b> All authorizer operations including authorization and ACL updates must be thread-safe.
 * </p>
 */
@InterfaceStability.Evolving
public interface Authorizer extends Configurable, Closeable {

    /**
     * Starts loading authorization metadata and returns futures that can be used to wait until
     * metadata for authorizing requests on each listener is available. Each listener will be
     * started only after its metadata is available and authorizer is ready to start authorizing
     * requests on that listener.
     *
     * @param clusterResourcebrokerInfo ClusterMetadata metadata for the Kafkabroker cluster
including broker id and  * @param listeners Listener names with their security protocolslistener endpoints
     * @return CompletableFutures for each listenerendpoint that completes when authorizer is ready to
     *         start authorizing requests on that listener.
 Returned map contains one future*/
    Map<Endpoint, *CompletableFuture<Void>>   start(BrokerInfo brokerInfo);

    /**
  for each listener name* inAuthorizes the input `listeners` map.
     */specified action. Additional metadata for the action is specified
    Map<String, CompletableFuture<Void>>* start(ClusterResource clusterResource,in `requestContext`.
     *
     * @param requestContext Request context including request type, security protocol and listener name
     * @param actions Actions being authorized including resource and operation for each action
     * @return List Map<String,of SecurityProtocol> listeners);

    /**authorization results for each action in the same order as the provided actions
     */
 Authorizes  the specified action. Additional metadata for the action is specifiedList<AuthorizationResult> authorize(KafkaRequestContext requestContext, List<Action> actions);

    /**
     * Creates innew ACL `requestContext`bindings.
     *
     * @param requestContext Request context includingif requestthe type,ACL securityis protocolbeing andcreated listenerby name
a broker to handle
  * @param actions Actions* being authorized including resource and operation for eacha action
client request to create ACLs. *This @returnmay Listbe ofnull authorizationif resultsACLs forare eachcreated actiondirectly in the same order as the provided actionsZooKeeper
     */
    List<AuthorizationResult> authorize(KafkaRequestContext requestContext, List<Action> actions);

    /**using AclCommand.
     * Creates@param newaclBindings ACL bindings. to create
     *
     * @param@return Create requestContextresult Requestfor contexteach ifACL thebinding ACLin isthe beingsame createdorder byas ain brokerthe toinput handlelist
     */
    List<AclCreateResult> createAcls(KafkaRequestContext requestContext,  a client request to create ACLs. This may be null if ACLs are created directly in ZooKeeperList<AclBinding> aclBindings);

    /**
     * Deletes all ACL bindings that match the provided filters.
     *
     * @param requestContext usingRequest AclCommand.
context if the ACL is *being @paramdeleted aclBindingsby ACLa bindingsbroker to createhandle
     *
     * @return Create resulta forclient eachrequest ACLto bindingdelete inACLs. theThis samemay orderbe asnull inif theACLs inputare list
deleted directly in ZooKeeper
  */
   * List<AclCreateResult> createAcls(KafkaRequestContext requestContext, List<AclBinding> aclBindings);

   using /**AclCommand.
     * Deletes@param allaclBindingFilters ACLFilters bindings to match ACL bindings that matchare theto providedbe filters.deleted
     *
     * @param@return requestContextDelete Requestresult contextfor ifeach thefilter ACLin isthe beingsame deletedorder byas ain brokerthe toinput handlelist.
     *        a clientEach requestresult toindicates deletewhich ACLs.ACL Thisbindings maywere beactually nulldeleted ifas ACLswell are deleted directly in ZooKeeperas any
     *        using AclCommand.
bindings that matched but could  * @param aclBindingFilters Filters to match ACL bindings that are to be deletednot be deleted.
     */
    List<AclDeleteResult> deleteAcls(KafkaRequestContext requestContext, List<AclBindingFilter> aclBindingFilters);

     /**
     * @returnReturns DeleteACL resultbindings forwhich eachmatch filter in the sameprovided orderfilter.
 as in the input list.*
     * @return Iterator for ACL bindings, which may be Eachpopulated resultlazily.
 indicates which ACL bindings were actually deleted as well as any
     *         bindings that matched but could not be deleted.
     * */
    List<AclDeleteResult>Iterable<AclBinding> deleteAclsacls(KafkaRequestContext requestContext, List<AclBindingFilter> aclBindingFilters);

    /**
     * Returns ACL bindings which match the provided filter.
     AclBindingFilter filter);
}


Request context will provided to authorizers using a new interface KafkaRequestContext. The existing class org.apache.kafka.common.requests.RequestContext will implement this interface.

Code Block
languagejava
titleRequest Context
package org.apache.kafka.common;

import java.net.InetAddress;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Request context interface that provides data from request header as well as connection
 * and authentication information to plugins.
 */
@InterfaceStability.Evolving
public interface KafkaRequestContext {

    /**
     * Returns name of listener on which request was received.
     */
    String listener();

    /**
     * Returns @returnthe security Iteratorprotocol for the ACLlistener bindings,on which mayrequest bewas populated lazilyreceived.
     */
    Iterable<AclBinding> acls(AclBindingFilter filter);
}

Request context will provided to authorizers using a new interface KafkaRequestContext. The existing class org.apache.kafka.common.requests.RequestContext will implement this interface.

Code Block
languagejava
titleRequest Context
package org.apache.kafka.common;

import java.net.InetAddress;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Request context interface that provides data from request header as well as connection
 * and authentication information to plugins.
 */
@InterfaceStability.Evolving
public interface KafkaRequestContext {

    /**
     * Returns name of listener on which request was receivedSecurityProtocol securityProtocol();

    /**
     * Returns authenticated principal for the connection on which request was received.
     */
    KafkaPrincipal principal();

    /**
     * Returns client IP address from which request was sent.
     */
    InetAddress clientAddress();

    /**
     * 16-bit API key of the request from the request header. See
     * https://kafka.apache.org/protocol#protocol_api_keys for request types.
     */
    Stringint listenerNamerequestType();

    /**
     * Returns thea securityname protocol for the request type. listenerFor onfetch whichrequests, requestthe wasmetrics received.names
     */
 FetchFollower and FetchConsumer SecurityProtocol securityProtocol();

    /**will be used to distinguish between
     * Returnsreplica authenticatedfetch principalrequests forand theclient connection on which request was receivedfetch requests.
     */
    KafkaPrincipalString principalrequestName();

    /**
     * Returns clientthe IPrequest addressversion from whichthe request was sentheader.
     */
    InetAddressint clientAddressrequestVersion();

    /**
     * 16-bit API key ofReturns the request from the request header. See
     * https://kafka.apache.org/protocol#protocol_api_keys for request types client id from the request header.
     */
    intString requestTypeclientId();

    /**
     * Returns a name for the request type. For fetch requests, the metrics names
     * FetchFollower and FetchConsumer will be used to distinguish between
     * replica fetch requests and client fetch requests.
     */
    String requestName();

    /**
     * Returns the request version from the request header.
     */
    int requestVersion the correlation id from the request header.
     */
    int correlationId();
}


BrokerInfo provides runtime broker configuration to authorization plugins including broker id, cluster id and endpoint information.

Code Block
languagejava
titleBroker Runtime Config
package org.apache.kafka.server;

import java.util.Collection;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Runtime broker configuration metadata for broker-side plugins.
 */
@InterfaceStability.Evolving
public interface BrokerInfo {

    interface Endpoint {

        String listener();

    /**
    SecurityProtocol securityProtocol();

 * Returns the client id from the request header.String host();

     */
   int String clientIdport();

    /**}

     * Returns the correlation id from the request header.ClusterResource clusterResource();

    int brokerId();

     */Collection<Endpoint> endpoints();

    intEndpoint correlationIdinterBrokerEndpoint();
}


Action  provides details of the action being authorized including resource and operation. Additional context including audit flag indicating authorization usage are also included, enabling access violation to be distinguished from resource filtering or optional ACLs.

...

Code Block
languagejava
titleAudit Flag
package org.apache.kafka.server.authorizer;

public enum AuditFlag {
    /**
     * Access was requested to resource. If authorization result is ALLOWED, access is granted to
     * the resource to perform the request. If DENY, request is failed with authorization failure.
     * <p>
     * Audit logs tracking ALLOWED access should include this if result is ALLOWED.
     * Audit logs tracking DENIED access should include this if result is DENIED.
     * </p>
     */
    MANDATORY_AUTHOEIZEAUTHORIZE,

    /**
     * Access was requested to resource. If authorization result is ALLOWED, access is granted to
     * the resource to perform the request. If DENY, alternative authorization rules are applied
     * to determine if access is allowed.
     * <p>
     * For example, topic creation is allowed if user has Cluster:Create
     * permission to create any topic or the fine-grained Topic:Create permission to create topics
     * of the requested name. Cluster:Create is an optional ACL in this case.
     * </p><p>
     * Audit logs tracking ALLOWED access should include this if result is ALLOWED.
     * Audit logs tracking DENIED access can omit this if result is DENIED, since an alternative
     * authorization is used to determine access.
     * </p>
     */
    OPTIONAL_AUTHORIZE,

    /**
     * Access was requested to authorized resources (e.g. to subscribe to regex pattern).
     * Request is performed on resources whose authorization result is ALLOWED and the rest of
     * the resources are filtered out.
     * <p>
     * Audit logs tracking ALLOWED access should include this if result is ALLOWED.
     * Audit logs tracking DENIED access can omit this if result is DENIED since access was not
     * actually requested for the specified resource and it is filtered out.
     * </p>
     */
    FILTER,

    /**
     * Request to list authorized operations. No access is actually performed by this request
     * based on the authorization result.
     * <p>
     * Audit logs tracking ALLOWED/DENIED access can omit these since no access is performed
     * as a result of this.
     * </p>
     */
    LIST_AUTHORIZED
}

...