Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleJava Authorizer Interface
package org.apache.kafka.server.authorizer;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Setconcurrent.CompletableFuture;
import javaorg.apache.utilkafka.concurrentcommon.CompletableFutureClusterResource;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.acl.AccessControlEntryAclBinding;
import org.apache.kafka.common.annotationacl.InterfaceStabilityAclBindingFilter;
import org.apache.kafka.common.resourceannotation.ResourcePatternInterfaceStability;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.KafkaRequestContext;

/**
 *
 * Pluggable authorizer interface for Kafka brokers.
 *
 * Startup sequence in brokers:
 * <ol>
 *   <li>Broker creates authorizer instance if configured in `authorizer.class` or `authorizer.class.name`.</li>
 *   <li>Broker configures and starts authorizer instance. Authorizer implementation starts loading its metadata.</li>
 *   <li>Broker starts SocketServer to accept connections and process requests.</li>
 *   <li>For each listener, SocketServer waits for authorization metadata to be available in the
 *       authorizer before accepting connections. The future returned by {@link #start(ClusterResource, Map)} )}
 *       must return only when authorizer is ready to authorize requests on the listener.</li>
 *   <li>Broker accepts connections. For each connection, broker performs authentication and then accepts Kafka requests.
 *       For each request, broker invokes {@link #authorize(KafkaRequestContext, List)} to authorize
 *       actions performed by the request.</li>
 * </ol>
 *
 * Authorizer implementation class may optionally implement the following interfaces:
 * <ul>
 *   <li>@@{@link org.apache.kafka.common.Reconfigurable}
 * to enable dynamic reconfiguration
 *   without restarting the broker.</li>
 * <p>
 *  <li>{@link org.apache.kafka.common.ClusterResourceListener} to obtain cluster id</li><b>Thread safety:</b> All authorizer operations including authorization and ACL updates must be thread-safe.
 * </ul>p>
 */
@InterfaceStability.Evolving
public interface Authorizer extends Configurable, Closeable {

    /**
     * Starts loading authorization metadata and returns futures that can be used to wait until
     * metadata for authorizing requests on each listener is available. Each listener will be
     * started only after its metadata is available and authorizer is ready to start authorizing
     * requests on that listener.
     *
     * @param clusterResource Cluster metadata for the Kafka cluster
     * @param listeners Listener names with their security protocols
     * @return CompletableFutures for each listener that completes when authorizer is ready to
     *         start authorizing requests on that listener. Returned map contains one future
     *         for each listener name in the input `listeners` map.
     */
    Map<String, CompletableFuture<Void>> start(Map<StringClusterResource clusterResource,
 SecurityProtocol>  listeners);

    /**
     * Authorizes the specified action. Additional metadata for the action is                          Map<String, SecurityProtocol> listeners);

    /**
     * Authorizes the specified action. Additional metadata for the action is specified
     * in `requestContext`.
     *
     * @param requestContext Request context including request type, security protocol and listener name
     * @param actions Actions being authorized including resource and operation for each action
     * @return List of authorization results for each action in the same order as the provided actions
     */
    List<AuthorizationResult> authorize(KafkaRequestContext requestContext, List<Action> actions);

    /**
     * Creates a new ACL bindingbindings.
     *
     * @param requestContext Request context if the ACL is being created by a broker to handle
     *        a client request to create ACLs. This may be null if ACLs are created directly in ZooKeeper
     *        using AclCommand.
     * @param resourcePatternaclBindings ResourceACL patternbindings for which ACLs are being addedto create
     * @param accessControlEntries List of access control entries to add for resource
     *
     * @return Create result for each accessACL control entrybinding in the same order as in the input list
     */
    List<AclCreateResult> createAcls(KafkaRequestContext requestContext, List<AclBinding> aclBindings);

    /**
     * Deletes all ACL bindings that match the provided filters.
     *
     * @param requestContext Request context if the ACL is ResourcePattern resourcePattern,being deleted by a broker to handle
     *        a client request to delete ACLs. This may be null if ACLs are deleted directly in ZooKeeper
     *    List<AccessControlEntry> accessControlEntries);

   using /**AclCommand.
     * @param aclBindingFilters DeletesFilters theto specifiedmatch ACL binding.
bindings that are to be *deleted
     *
 @param requestContext Request context if* the@return ACLDelete isresult beingfor deletedeach byfilter ain brokerthe tosame handle
order as in the input *list.
     *   a  client request to delete ACLs.Each Thisresult mayindicates bewhich nullACL ifbindings ACLswere areactually deleted directlyas well inas ZooKeeperany
     *        using AclCommand.
bindings that matched but could *not @parambe resourcePatterndeleted.
 Resource pattern for which ACLs*/
 are being removed   List<AclDeleteResult> deleteAcls(KafkaRequestContext requestContext, List<AclBindingFilter> aclBindingFilters);

    /**
 *  @param accessControlEntries Set* ofReturns accessACL controlbindings entrieswhich tomatch removethe forprovided resourcefilter.
     *
     * @return DeleteIterator result for eachACL accessbindings, controlwhich entrymay inbe the same order as in the input listpopulated lazily.
     */
    Iterable<AclBinding>     Each result indicates if the entry was actually deleted.
     */
    List<AclDeleteResult> deleteAcls(KafkaRequestContext requestContext,
                                     ResourcePattern resourcePattern,
                                     List<AccessControlEntry> accessControlEntriesacls(AclBindingFilter filter);
}


Request context will provided to authorizers using a new interface KafkaRequestContext. The existing class org.apache.kafka.common.requests.RequestContext will implement this interface.

Code Block
languagejava
titleRequest Context
package org.apache.kafka.common;

import java.net.InetAddress;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Request context interface that provides data from request header as well as connection
 * and authentication information to plugins.
 */
@InterfaceStability.Evolving
public interface KafkaRequestContext {

    /**
     * Returns name of listener on which request was received.
     */
    String listenerName();

    /**
     * Returns the security protocol for the listener on which request was received.
     */
    SecurityProtocol securityProtocol();

    /**
     * DeletesReturns allauthenticated ACL bindingsprincipal for the specifiedconnection resourceon pattern.which Onlyrequest ACLs explicitly configuredwas received.
     */
 with this resource pattern are deleted, no matching is performed.KafkaPrincipal principal();

     /**
     * @paramReturns requestContextclient RequestIP contextaddress iffrom thewhich ACLsrequest are being deleted by a broker to handlewas sent.
     */
    InetAddress clientAddress();

   a client request to delete ACLs. This may be null if ACLs are deleted directly in ZooKeeper /**
     * Returns the request type (ApiKey) from the request header. For fetch requests,
     * the metrics names FetchFollower and FetchConsumer will be used to usingdistinguish AclCommand.between
     * @paramreplica resourcePatternfetch Resourcerequests patternand whoseclient ACLsfetch are deletedrequests.
     */
     * @return result that indicates if at least one ACL binding was deleted or if delete failedString requestType();

    /**
     * Returns the request version from the request header.
     */
    AclDeleteResultint deleteAcls(KafkaRequestContext requestContext, ResourcePattern resourcePatternrequestVersion();

    /**
     * Returns allthe ACLclient bindings.
id from the request  *header.
     */
 @return Access control entries of all ACL bindings grouped by resource pattern.
     */
    Map<ResourcePattern, Set<AccessControlEntry>> aclsString clientId();

    /**
     * Returns accessthe controlcorrelation entriesid definedfrom for the specifiedrequest resource patternheader.
     */
     * @param resourcePattern Resource pattern for which ACLs are returned. No matching is performed,
     *        only ACLs defined for the specified pattern are returned.
     *
     * @return ACL bindings for resource pattern.
     */
    Set<AccessControlEntry> acls(ResourcePattern resourcePattern);
}

Request context will provided to authorizers using a new interface KafkaRequestContext. The existing class org.apache.kafka.common.requests.RequestContext will implement this interface.

int correlationId();
}


Action  provides details of the action being authorized including resource and operation. Additional context including authorization mode are also included, enabling access violation to be distinguished from resource filtering or optional ACLs.


Code Block
languagejava
titleAuthorizable Action
package org.apache.kafka.server.authorizer;

import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;

@InterfaceStability.Evolving
public class Action {

    private final ResourcePattern resourcePattern;
    private final AclOperation operation;
    private final AuthorizationMode authorizationMode;
    private final int count;

    public Action(AclOperation operation,
                  ResourceType resourceType,
                  String resourceName,
                  AuthorizationMode authorizationMode,
                  int count) {
        this.operation = operation;
        this.resourcePattern = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
        this.authorizationMode = authorizationMode;
        this.count = count;
    }
Code Block
languagejava
titleRequest Context
package org.apache.kafka.common;

import java.net.InetAddress;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Request context interface that provides data from request header as well as connection
 * and authentication information to plugins.
 */
@InterfaceStability.Evolving
public interface KafkaRequestContext {

    /**
     * Returns name of listener on which request was received.
     */
    String listenerName();

    /**
     * Returns the security protocol for the listener on which request was received.
     */
    SecurityProtocol securityProtocol();

    /**
     * Returns authenticated principal for the connection on which request was received.
     */
    KafkaPrincipal principal();

    /**
     * Returns client IP address from which request was sent.
     */
    InetAddress clientAddress();

    /**
     * Returns the request type (ApiKey) from the request header. For fetch requests,
     * the metrics names FetchFollower and FetchConsumer will be used to distinguish between
     * replica fetch requests and client fetch requests.
     */
    String requestType();

    /**
     * ReturnsResource theon requestwhich versionaction fromis thebeing request headerperformed.
     */
    intpublic ResourcePattern requestVersionresourcePattern() {
        return resourcePattern;
    }

    /**
     * ReturnsOperation the client id from the request headerbeing performed.
     */
    public StringAclOperation clientIdoperation() {
        return operation;
    }

    /**
     * ReturnsAuthorization themode correlationto idenable fromauthorization thelogs requestto header.
distinguish between    */attempts
    int correlationId();
}

Action  provides details of the action being authorized including resource and operation. Additional context including authorization mode are also included, enabling access violation to be distinguished from resource filtering or optional ACLs.

  * to access unauthorized resources and other filtering operations performed by the broker.
     */
    public AuthorizationMode authorizationMode() {
        return authorizationMode;
    }

    /**
     * Number of times the authorization result is used. For example, a single topic
     * authorization result may be used with `count` partitions of the topic within a single
     * request.
     */
    public int count() {
        return count;
    }


Authorization Mode:

Code Block
languagejava
titleAuthorization Mode
package org.apache.kafka.server.authorizer;

public enum AuthorizationMode {
    /**
     * Access was requested to resource. If authorization result is ALLOWED, access is granted to
     * the resource to perform the request. If DENIED, request is failed with authorization failure.
     */
    MANDATORY,

    /**
     * Access was requested to resource. If authorization result is ALLOWED, access is granted to
Code Block
languagejava
titleAuthorizable Action
package org.apache.kafka.server.authorizer;

import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;

@InterfaceStability.Evolving
public class Action {

    private final ResourcePattern resourcePattern;
    private final AclOperation operation;
    private final AuthorizationMode authorizationMode;
    private final int count;

    public Action(AclOperation operation,
                  ResourceType resourceType,
                  String resourceName,
                  AuthorizationMode authorizationMode,
                  int count) {
        this.operation = operation;
        this.resourcePattern = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
        this.authorizationMode = authorizationMode;
        this.count = count;
    }

    /**
     * Resourcethe onresource whichto actionperform isthe being performedrequest.
 If DENIED, alternative  */authorization rules are applied
    public ResourcePattern* resourcePattern() {
    to determine if access is allowed.
    return resourcePattern;*/
    }OPTIONAL,

    /**
     * Operation being performed Access was requested to authorized resources (e.g. to subscribe to regex pattern).
     */
 Request is performed publicon AclOperationresources operation() {
        return operation;
    }

    /**whose authorization result is ALLOWED and the rest of
     * the resources are filtered out.
     */
 Authorization mode to enableFILTER,

 authorization logs to distinguish between attempts /**
     * Request to accesslist unauthorizedauthorized resourcesoperations. andNo otheraccess filteringis operationsactually performed by thethis broker.request
     */
 based on the public AuthorizationMode authorizationMode() {authorization result.
     */
   return authorizationMode;
    }

    /**LIST_AUTHORIZED
}


Authorize method returns individual allowed/denied results for every action. ACL create and delete operations will return any exceptions from each access control entry requested.

Code Block
languagejava
titleAuthorizer Operation Results
package org.apache.kafka.server.authorizer;

public enum AuthorizationResult {
    ALLOWED,
 * Number of times the authorization result is used. For example, a single topic
     * authorization result may be used with `count` partitions of the topic within a single DENIED
}

import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Evolving
public class AclCreateResult {
    private final Throwable exception;

    public * request.AclCreateResult() {
     */
   this(null);
 public int count() {}

    public AclCreateResult(Throwable exception) {
  return count;
    }

Authorization Mode:

Code Block
languagejava
titleAuthorization Mode
package org.apache.kafka.server.authorizer;

public enum AuthorizationMode { this.exception = exception;
    }

    /**
     * AccessReturns wasany requestedexception toduring resourcecreate. If authorization resultexception is ALLOWEDnull, accessthe isrequest grantedhas tosucceeded.
     * the resource to perform the request. If DENIED, request is failed with authorization failure/
    public Throwable exception() {
        return exception;
    }

    /**
     * Returns true if the request failed.
     */
    MANDATORY,
public boolean failed() {
    /**
    return *exception Access was requested to resource. If authorization result is ALLOWED, access is granted to!= null;
    }
}


Code Block
languagejava
titleDelete Results
package org.apache.kafka.server.authorizer;

import java.util.Collections;
import java.util.Set;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.errors.ApiException;

public class AclDeleteResult {
    private *final theApiException resourceexception;
 to perform the request. If DENIED, alternative authorization rules are applied private final Set<DeleteResult> deleteResults;

    public * to determine if access is allowed.
AclDeleteResult(ApiException exception) {
         */this(Collections.emptySet(), exception);
    OPTIONAL,}

    /**
     * Access was requested to authorized resources (e.g. to subscribe to regex pattern).public AclDeleteResult(Set<DeleteResult> deleteResults) {
        this(deleteResults, null);
    }

    private * Request is performed on resources whose authorization result is ALLOWED and the rest ofAclDeleteResult(Set<DeleteResult> deleteResults, ApiException exception) {
        this.deleteResults = deleteResults;
     * the resources are filtered outthis.
exception     */= exception;
    FILTER,}

    /**
     * Request to list authorized operations. No access is actually performed by this request
     * based on the authorization resultReturns any exception while attempting to match ACL filter to delete ACLs.
     */
    LIST_AUTHORIZED
}

Authorize method returns individual allowed/denied results for every action. ACL create and delete operations will return any exceptions from each access control entry requested.

Code Block
languagejava
titleAuthorizer Operation Results
package org.apache.kafka.server.authorizer;

public enum AuthorizationResultpublic ApiException exception() {
    ALLOWED,
    DENIED
}

import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Evolving
public class AclCreateResult {return exception;
    }

    private final Throwable exception;

/**
    public AclCreateResult() {
        this(null);
* Returns delete result for each matching ACL binding.
     }
*/
    public AclCreateResult(Throwable exceptionSet<DeleteResult> deleteResults() {
        this.exception = exceptionreturn deleteResults;
    }


    /**
     * ReturnsDelete anyresult exceptionfor duringeach create.ACL Ifbinding exceptionthat ismatched null,a the request has succeededdelete filter.
     */
    public Throwableclass exception()DeleteResult {
        returnprivate exception;
final    }

    /**AclBinding aclBinding;
     * Returns true ifprivate thefinal request failed.
 ApiException exception;

    */
    public boolean failed(DeleteResult(AclBinding aclBinding) {
        return   exception != this(aclBinding, null);
    }
}

import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Evolving
public class AclDeleteResult {}

    private final Throwable exception;
 public DeleteResult(AclBinding aclBinding, privateApiException final boolean deleted;

    public AclDeleteResult(Throwable exception) {
exception) {
            this(false, exception).aclBinding = aclBinding;
    }

    public AclDeleteResult(boolean deleted) {
 this.exception =  exception;
    this(deleted, null);
    }

    private AclDeleteResult(boolean deleted, Throwable exception) { /**
        this.deleted =* deleted;
Returns ACL binding that matched the delete  thisfilter.exception = exception;
    }

 {@link #deleted()} indicates if
    /**
     * Returnsthe anybinding exceptionwas while attempting to delete an ACL.
deleted.
         */
        public ThrowableAclBinding exceptionaclBinding() {
            return exceptionaclBinding;
        }

        /**
     *  Returns true if* requestReturns hasexception failedthat withresulted anin exception.failure {@link #deleted()} indicates ifto delete ACL binding.
     * an ACL was deleted.*/
     */
    public booleanApiException failedexception() {
            return exception;
  !=  null;
    }

        /**
         * Returns true if aACL deletebinding was performed deleted, false otherwise.
         */
        public boolean deleted() {
            return deleted exception == null;
        }
    }

}


Proposed Changes

Deprecate existing Scala Authorizer

...