Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleJava Authorizer Interface
package org.apache.kafka.server.authorizer;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.KafkaRequestContext;
import org.apache.kafka.server.BrokerInfoServerInfo;
import org.apache.kafka.server.BrokerInfoServerInfo.Endpoint;

/**
 *
 * Pluggable authorizer interface for Kafka brokers.
 *
 * Startup sequence in brokers:
 * <ol>
 *   <li>Broker creates authorizer instance if configured in `authorizer.class.name`.</li>
 *   <li>Broker configures and starts authorizer instance. Authorizer implementation starts loading its metadata.</li>
 *   <li>Broker starts SocketServer to accept connections and process requests.</li>
 *   <li>For each listener, SocketServer waits for authorization metadata to be available in the
 *       authorizer before accepting connections. The future returned by {@link #start(BrokerInfoServerInfo)}
 *       must return only when authorizer is ready to authorize requests on the listener.</li>
 *   <li>Broker accepts connections. For each connection, broker performs authentication and then accepts Kafka requests.
 *       For each request, broker invokes {@link #authorize(KafkaRequestContext, List)} to authorize
 *       actions performed by the request.</li>
 * </ol>
 *
 * Authorizer implementation class may optionally implement @{@link org.apache.kafka.common.Reconfigurable}
 * to enable dynamic reconfiguration without restarting the broker.
 * <p>
 * <b>Thread safety:</b> All authorizer operations including authorization and ACL updates must be thread-safe.
 * </p>
 */
@InterfaceStability.Evolving
public interface Authorizer extends Configurable, Closeable {

    /**
     * Starts loading authorization metadata and returns futures that can be used to wait until
     * metadata for authorizing requests on each listener is available. Each listener will be
     * started only after its metadata is available and authorizer is ready to start authorizing
     * requests on that listener.
     *
     * @param brokerInfoserverInfo Metadata for the broker including broker id and listener endpoints
     * @return CompletableFutures for each endpoint that completes when authorizer is ready to
     *         start authorizing requests on that listener.
     */
    Map<Endpoint, CompletableFuture<Void>> start(BrokerInfoServerInfo brokerInfoserverInfo);

    /**
     * Authorizes the specified action. Additional metadata for the action is specified
     * in `requestContext`.
     *
     * @param requestContext Request context including request type, security protocol and listener name
     * @param actions Actions being authorized including resource and operation for each action
     * @return List of authorization results for each action in the same order as the provided actions
     */
    List<AuthorizationResult> authorize(KafkaRequestContext requestContext, List<Action> actions);

    /**
     * Creates new ACL bindings.
     *
     * @param requestContext Request context if the ACL is being created by a broker to handle
     *        a client request to create ACLs. This may be null if ACLs are created directly in ZooKeeper
     *        using AclCommand.
     * @param aclBindings ACL bindings to create
     *
     * @return Create result for each ACL binding in the same order as in the input list
     */
    List<AclCreateResult> createAcls(KafkaRequestContext requestContext, List<AclBinding> aclBindings);

    /**
     * Deletes all ACL bindings that match the provided filters.
     *
     * @param requestContext Request context if the ACL is being deleted by a broker to handle
     *        a client request to delete ACLs. This may be null if ACLs are deleted directly in ZooKeeper
     *        using AclCommand.
     * @param aclBindingFilters Filters to match ACL bindings that are to be deleted
     *
     * @return Delete result for each filter in the same order as in the input list.
     *         Each result indicates which ACL bindings were actually deleted as well as any
     *         bindings that matched but could not be deleted.
     */
    List<AclDeleteResult> deleteAcls(KafkaRequestContext requestContext, List<AclBindingFilter> aclBindingFilters);

    /**
     * Returns ACL bindings which match the provided filter.
     *
     * @return Iterator for ACL bindings, which may be populated lazily.
     */
    Iterable<AclBinding> acls(AclBindingFilter filter);
}

...

Code Block
languagejava
titleRequest Context
package org.apache.kafka.common;

import java.net.InetAddress;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Request context interface that provides data from request header as well as connection
 * and authentication information to plugins.
 */
@InterfaceStability.Evolving
public interface KafkaRequestContext {

    /**
     * Returns name of listener on which request was received.
     */
    String listener();

    /**
     * Returns the security protocol for the listener on which request was received.
     */
    SecurityProtocol securityProtocol();

    /**
     * Returns authenticated principal for the connection on which request was received.
     */
    KafkaPrincipal principal();

    /**
     * Returns client IP address from which request was sent.
     */
    InetAddress clientAddress();

    /**
     * 16-bit API key of the request from the request header. See
     * https://kafka.apache.org/protocol#protocol_api_keys for request types.
     */
    int requestType();

    /**
     * Returns a name for the request type. For fetch requests, the metrics names
     * FetchFollower and FetchConsumer will be used to distinguish between
     * replica fetch requests and client fetch requests.
     */
    String requestName();

    /**
     * Returns the request version from the request header.
     */
    int requestVersion();

    /**
     * Returns the client id from the request header.
     */
    String clientId();

    /**
     * Returns the correlation id from the request header.
     */
    int correlationId();
}


BrokerInfo ServerInfo provides runtime broker configuration to authorization plugins including broker id, cluster id and endpoint information.

Code Block
languagejava
titleBroker Runtime Config
package org.apache.kafka.server;

import java.util.Collection;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Runtime broker configuration metadata for broker-side plugins.
 */
@InterfaceStability.Evolving
public interface BrokerInfoServerInfo {

    interface Endpoint {

        String listener();

        SecurityProtocol securityProtocol();

        String host();

        int port();
    }

    ClusterResource clusterResource();

    int brokerId();

    Collection<Endpoint> endpoints();

    Endpoint interBrokerEndpoint();
}

...

Code Block
languagejava
titleAuthorizable Action
package org.apache.kafka.server.authorizer;

import java.util.Objects;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;

@InterfaceStability.Evolving
public class Action {

    private final ResourcePattern resourcePattern;
    private final AclOperation operation;
    private final int resourceReferenceCount;
    private final AuditFlagboolean auditFlaglogIfAllowed;
    private final intboolean resourceReferenceCountlogIfDenied;

    public Action(AclOperation operation,
        ResourceType  resourceType,
        StringResourcePattern resourceNameresourcePattern,
         AuditFlag auditFlag,
        int resourceReferenceCount) {,
        this.operation = operation;
        this.resourcePattern = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
boolean logIfAllowed,
            this.auditFlag = auditFlag     boolean logIfDenied) {
        this.operation = operation;
        this.resourcePattern = resourcePattern;
        this.logIfAllowed = logIfAllowed;
        this.logIfDenied = logIfDenied;
        this.resourceReferenceCount = resourceReferenceCount;
    }

    /**
     * Resource on which action is being performed.
     */
    public ResourcePattern resourcePattern() {
        return resourcePattern;
    }

    /**
     * Operation being performed.
     */
    public AclOperation operation() {
        return operation;
    }

    /**
     * AuthorizationIndicates usageif flagaudit to enable authorization logs to distinguish between attemptslogs tracking ALLOWED access should include this action if result is
     * ALLOWED. toThe accessflag unauthorizedis resourcestrue andif otheraccess filteringto operationsa performedresource byis thegranted broker.
while processing the request as */a
     public* AuditFlagresult auditFlag() {
        return auditFlag;
    }

of this authorization. The flag is false only for requests used to describe access where
    /**
     * Numberno ofoperation timeson the resource beingis authorizedactually isperformed referencedbased withinon the authorization requestresult.
 For example, a single */
    public *boolean request may reference `n` topic partitions of the same topic. Brokers will authorize the topic oncelogIfAllowed() {
        return logIfAllowed;
    }

     /**
 with `resourceReferenceCount=n`. Authorizers may include* theIndicates countif in audit logs.
 tracking DENIED access should */
include this action if publicresult intis
 resourceReferenceCount() {
   * DENIED. The flag is returntrue resourceReferenceCount;
if access to a }

resource was explicitly requested and @Overriderequest
    public boolean* equals(Object o) {
        if (this == o) {is denied as a result of this authorization request. The flag is false if request was
     * filtering out authorized resources (e.g. to returnsubscribe true;
to regex pattern). The flag is also
  }
   * false if this is if (!(o instanceof Action)) {
    an optional authorization where an alternative resource authorization is
     * applied if returnthis false;
        }

fails (e.g. Cluster:Create which is subsequently overridden by Topic:Create).
     */
   Action thatpublic =boolean logIfDenied(Action) o;{
        return Objects.equals(this.resourcePattern, that.resourcePattern) &&
  logIfDenied;
    }

    /**
      Objects.equals(this.operation, that.operation) &&
            Objects.equals(this.auditFlag, that.auditFlag) &&* Number of times the resource being authorized is referenced within the request. For example, a single
     * request may reference `n` topic  Objects.equals(this.resourceReferenceCount, that.resourceReferenceCount);

    }

    @Overridepartitions of the same topic. Brokers will authorize the topic once
     * with `resourceReferenceCount=n`. Authorizers may include the count in audit logs.
     */
    public int hashCoderesourceReferenceCount() {
        return Objects.hash(resourcePattern, operation, auditFlag)resourceReferenceCount;
    }

    @Override
    public Stringboolean toStringequals(Object o) {
        return "Action(" +if (this == o) {
            ", resourcePattern='" + resourcePattern + '\'' +
return true;
        }
         ", operation='" + operation + '\'' +
if (!(o instanceof Action)) {
            return false;
  ", auditFlag='" + auditFlag + '\'' + }

        Action that =  ", resourceReferenceCount='" + resourceReferenceCount + '\'' +
            ')';
    }
}

Audit flag provides additional context for audit logging:

Code Block
languagejava
titleAudit Flag
package org.apache.kafka.server.authorizer;

public enum AuditFlag {
    /**
     * Access was requested to resource. If authorization result is ALLOWED, access is granted to
     * the resource to perform the request. If DENY, request is failed with authorization failure.
     * <p>
     * Audit logs tracking ALLOWED access should include this if result is ALLOWED.
     * Audit logs tracking DENIED access should include this if result is DENIED.
     * </p>
     */
    MANDATORY_AUTHORIZE,

    /**
     * Access was requested to resource. If authorization result is ALLOWED, access is granted to
     * the resource to perform the request. If DENY, alternative authorization rules are applied
     * to determine if access is allowed.
     * <p>
     * For example, topic creation is allowed if user has Cluster:Create
     * permission to create any topic or the fine-grained Topic:Create permission to create topics
     * of the requested name. Cluster:Create is an optional ACL in this case.
     * </p><p>
     * Audit logs tracking ALLOWED access should include this if result is ALLOWED.
     * Audit logs tracking DENIED access can omit this if result is DENIED, since an alternative
     * authorization is used to determine access.
     * </p>
     */
    OPTIONAL_AUTHORIZE,

    /**
     * Access was requested to authorized resources (e.g. to subscribe to regex pattern).
     * Request is performed on resources whose authorization result is ALLOWED and the rest of
     * the resources are filtered out.
     * <p>
     * Audit logs tracking ALLOWED access should include this if result is ALLOWED.
     * Audit logs tracking DENIED access can omit this if result is DENIED since access was not
     * actually requested for the specified resource and it is filtered out.
     * </p>
     */
    FILTER,

    /**
     * Request to list authorized operations. No access is actually performed by this request
     * based on the authorization result.
     * <p>
     * Audit logs tracking ALLOWED/DENIED access can omit these since no access is performed
     * as a result of this.
     * </p>
     */
    LIST_AUTHORIZED
}

(Action) o;
        return Objects.equals(this.resourcePattern, that.resourcePattern) &&
            Objects.equals(this.operation, that.operation) &&
            this.resourceReferenceCount == that.resourceReferenceCount &&
            this.logIfAllowed == that.logIfAllowed &&
            this.logIfDenied == that.logIfDenied;

    }

    @Override
    public int hashCode() {
        return Objects.hash(resourcePattern, operation, resourceReferenceCount, logIfAllowed, logIfDenied);
    }

    @Override
    public String toString() {
        return "Action(" +
            ", resourcePattern='" + resourcePattern + '\'' +
            ", operation='" + operation + '\'' +
            ", resourceReferenceCount='" + resourceReferenceCount + '\'' +
            ", logIfAllowed='" + logIfAllowed + '\'' +
            ", logIfDenied='" + logIfDenied + '\'' +
            ')';
    }
}


Authorize method returns individual allowed/denied results for every action. ACL create and delete operations will return any exceptions from each access control entry requested.

Code Block
languagejava
titleAuthorizer Operation Results
package org.apache.kafka.server.authorizer;

public enum AuthorizationResult {
    ALLOWED,
    DENIED
}


ACL create operation returns any exception from each ACL binding requested.

Code Block
languagejava
titleAuthorizer Operation Results
package org.apache.kafka.server.authorizer;

import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Evolving
public class AclCreateResult {
    private final Throwable exception;

    public AclCreateResult() {
        this(null);
    }

    public AclCreateResult(Throwable exception) {
        this.exception = exception;
    }

    /**
     * Returns any exception during create. If exception is null, the request has succeeded.
     */
    public Throwable exception() {
        return exception;
    }

    /**
     * Returns true if the request failed.
     */
    public boolean failed() {
        return exception != null;
    }
}


ACL delete operation returns any exception from each ACL filter requested. Matching ACL bindings for each filter are returned along with any delete failure.

Code Block
languagejava
titleDelete Results
package org.apache.kafka.server.authorizer;

import java.util.Collections;
import java.util.Collection;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.errors.ApiException;

public class AclDeleteResult {
    private final ApiException exception;
    private final Collection<DeletionResult> deletionResults;

    public AclDeleteResult(ApiException exception) {
        this(Collections.emptySet(), exception);
    }

    public AclDeleteResult(Collection<DeletionResult> deleteResults) {
        this(deleteResults, null);
    }

    private AclDeleteResult(Collection<DeletionResult> deleteResults, ApiException exception) {
        this.deletionResults = deleteResults;
        this.exception = exception;
    }

    /**
     * Returns any exception while attempting to match ACL filter to delete ACLs.
     */
    public ApiException exception() {
        return exception;
    }

    /**
     * Returns delete result for each matching ACL binding.
     */
    public Collection<DeletionResult> deletionResults() {
        return deletionResults;
    }


    /**
     * Delete result for each ACL binding that matched a delete filter.
     */
    public static class DeletionResult {
        private final AclBinding aclBinding;
        private final ApiException exception;

        public DeletionResult(AclBinding aclBinding) {
            this(aclBinding, null);
        }

        public DeletionResult(AclBinding aclBinding, ApiException exception) {
            this.aclBinding = aclBinding;
            this.exception = exception;
        }

        /**
         * Returns ACL binding that matched the delete filter. {@link #deleted()} indicates if
         * the binding was deleted.
         */
        public AclBinding aclBinding() {
            return aclBinding;
        }

        /**
         * Returns exception that resulted in failure to delete ACL binding.
         */
        public ApiException exception() {
            return exception;
        }

        /**
         * Returns true if ACL binding was deleted, false otherwise.
         */
        public boolean deleted() {
            return exception == null;
        }
    }
}

...