Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Define a new Java interface for authorizer in 'clients' module in the package 'org.apache.kafka.server' similar to other server-side pluggable classes.
  2. KIP-4 has added ACL-related classes in the package org.apache.kafka.common (e.g. ResourcePattern and AccessControlEntry) to support ACL management using AdminClient. We will attempt to reuse these classes wherever possible.
  3. Deprecate but retain existing Scala authorizer API for backward compatibility to ensure that existing custom authorizers can be used with new brokers.
  4. Provide context about the request to authorizers to enable context-specific logic based on security protocol or listener to be applied to authorization.
  5. Provide additional context about the request including ApiKey and correlation id from the request header since these are useful for matching debug-level authorization logs with corresponding request logs.
  6. For ACL updates, provide request context including principal requesting the update and the listener on which request arrived to enable additional validation.
  7. Return individual responses for each access control entry update when multiple entries of a resource are updated. At the moment, we update the ZooKeeper node for a resource pattern multiple times when a request adds or removes multiple entries for a resource in a single update request. Since it is a common usage pattern to add or remove multiple access control entries while updating ACLs for a resource, batched updates will be supported to enable a single atomic update for each resource pattern.
  8. Provide authorization usage flag to authorizers to enable authorization logs to indicate attempts to access unauthorized resources. Kafka brokers log denied operations at INFO level and allowed operations at DEBUG level with the expectation that denied operations are rare and indicate erroneous or malicious use of the system. But we currently have several uses of Authorizer#authorize for filtering accessible resources or operations, for example for regex subscription. These fill up authorization logs with denied log entries, making these logs unusable for determining actual attempts to access resources by users who don’t have appropriate permissions. Audit flag will enable the authorizer to determine the severity of denied access.
  9. For authorizers that don’t store metadata in ZooKeeper, ensure that authorizer metadata for each listener is available before starting up the listener. This enables different authorization metadata stores for different listeners.
  10. Add a new out-of-the-box authorizer class that implements the new authorizer interface, making use of the features supported by the new API.
  11. Retain existing audit log entry format in SimpleAclAuthorizer to ensure that tools that parse these logs continue to work.
  12. Enable Authorizer implementations to make use of additional Kafka interfaces similar to other pluggable callbacks. Authorizers can implement org.apache.kafka.common.Reconfigurable  to support dynamic reconfiguration without restarting the broker. Authorizers will also be provided cluster id which may be included in logs or used to support centralized ACL storage.
  13. Enable asynchronous ACL updates to avoid blocking broker request threads when ACLs are updated in a remote store (e.g. a database).

Public Interfaces

Authorizer Configuration

...

Code Block
languagejava
titleJava Authorizer Interface
package org.apache.kafka.server.authorizer;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFutureCompletionStage;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;

/**
 *
 * Pluggable authorizer interface for Kafka brokers.
 *
 * Startup sequence in brokers:
 * <ol>
 *   <li>Broker creates authorizer instance if configured in `authorizer.class.name`.</li>
 *   <li>Broker configures and starts authorizer instance. Authorizer implementation starts loading its metadata.</li>
 *   <li>Broker starts SocketServer to accept connections and process requests.</li>
 *   <li>For each listener, SocketServer waits for authorization metadata to be available in the
 *       authorizer before accepting connections. The future returned by {@link #start(AuthorizerServerInfo)}
 *       must return only when authorizer is ready to authorize requests on the listener.</li>
 *   <li>Broker accepts connections. For each connection, broker performs authentication and then accepts Kafka requests.
 *       For each request, broker invokes {@link #authorize(AuthorizableRequestContext, List)} to authorize
 *       actions performed by the request.</li>
 * </ol>
 *
 * Authorizer implementation class may optionally implement @{@link org.apache.kafka.common.Reconfigurable}
 * to enable dynamic reconfiguration without restarting the broker.
 * <p>
 * <b>Thread<b>Threading safetymodel:</b> All
 * <ul>
 *   <li>All authorizer operations including authorization and ACL updates must be thread-safe.
 * </p>li>
 */
@InterfaceStability.Evolving
public interface Authorizer extends Configurable, Closeable {

    /**
     * Starts loading authorization metadata and returns futures that can be used to wait until
     * metadata for authorizing requests on each listener is available. Each listener will be
     * started only after its metadata is available and authorizer is ready to start authorizing
     * requests on that listener.
     *
     * @param serverInfo Metadata for the broker including broker id and listener endpoints
     * @return CompletableFutures for each endpoint that completes when authorizer is ready to
     *         start authorizing requests on that listener.
     */
    Map<Endpoint, CompletableFuture<Void>> start(AuthorizerServerInfo serverInfo);

    /**
     * Authorizes the specified action. Additional metadata for the action is specified
     * in `requestContext`.   <li>ACL update methods are asynchronous. Implementations with low update latency may return a
 *       completed future using {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}.
 *       This ensures that the request will be handled synchronously by the caller without using a
 *       purgatory to wait for the result. If ACL updates require remote communication which may block,
 *       return a future that is completed asynchronously when the remote operation completes. This enables
 *       the caller to process other requests on the request threads without blocking.</li>
 *   <li>Any threads or thread pools used for processing remote operations asynchronously can be started during
 *       {@link #start(AuthorizerServerInfo)}. These threads must be shutdown during {@link Authorizer#close()}.</li>
 * </ul>
 * </p>
 */
@InterfaceStability.Evolving
public interface Authorizer extends Configurable, Closeable {

    /**
     *
 Starts loading authorization metadata *and @paramreturns requestContextfutures Requestthat contextcan includingbe requestused type,to security protocol and listener namewait until
     * @parammetadata for actionsauthorizing Actionsrequests beingon authorizedeach includinglistener resourceis andavailable. operationEach forlistener eachwill actionbe
     * started only @returnafter Listits ofmetadata authorizationis resultsavailable forand eachauthorizer actionis inready theto samestart orderauthorizing
 as the provided actions
 * requests on that */listener.
    List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions);

 *
     /**
 @param serverInfo Metadata for *the Createsbroker newincluding ACLbroker bindings.
id and listener   *endpoints
     * @param@return requestContextCompletionStage Requestfor contexteach ifendpoint thethat ACLcompletes iswhen beingauthorizer created by a brokeris ready to handle
     *         astart clientauthorizing requestrequests toon createthat ACLslistener.
 This may be null if*/
 ACLs are created directly in ZooKeeper Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo);

     /**
     * Authorizes the usingspecified AclCommand.
     * @param aclBindings ACL bindings to createaction. Additional metadata for the action is specified
     * in `requestContext`.
     *
     * @return@param CreaterequestContext resultRequest forcontext eachincluding ACLrequest bindingtype, insecurity theprotocol sameand orderlistener asname
 in the input list
 * @param actions Actions */
being authorized including resource List<AclCreateResult>and createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings);

operation for each action
     /**
 @return List of authorization results *for each Deletesaction allin ACLthe bindingssame thatorder matchas the provided filters.actions
     */
     * @param requestContext Request context if the ACL is being deleted by a broker to handleList<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions);

    /**
     * Creates new ACL bindings.
     * <p>
     * This ais clientan requestasynchronous toAPI deletethat ACLs.enables Thisthe maycaller beto nullavoid ifblocking ACLsduring arethe deletedupdate. directlyImplementations inof ZooKeeperthis
     * API can return completed futures using  using AclCommand.{@link java.util.concurrent.CompletableFuture#completedFuture(Object)}
     * @param aclBindingFilters Filters to matchprocess ACLthe bindingsupdate thatsynchronously areon tothe berequest deletedthread.
     *
     * @return@param DeleterequestContext resultRequest forcontext eachif filterthe inACL theis samebeing ordercreated asby ina thebroker inputto list.handle
     *        a Eachclient resultrequest indicatesto whichcreate ACLACLs. bindingsThis weremay actuallybe deletednull asif wellACLs asare any
created directly in ZooKeeper
  *     *    bindings that matched but could not be deletedusing AclCommand.
     */
 @param aclBindings ACL List<AclDeleteResult>bindings deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters);

to create
     /**
     * Returns@return ACLCreate bindingsresult whichfor match the provided filter.each ACL binding in the same order as in the input list. Each result
     *
         is *returned @returnas Iteratora forCompletionStage ACLthat bindings,completes whichwhen maythe beresult populatedis lazilyavailable.
     */
    List<? extends Iterable<AclBinding>CompletionStage<AclCreateResult>> acls(AclBindingFilter filtercreateAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings);
}

Request context will be provided to authorizers using a new interface AuthorizableRequestContext. The existing class org.apache.kafka.common.requests.RequestContext will implement this interface. New methods may be added to this interface in future, so mock implementations using this interface should adapt to these changes.

Code Block
languagejava
titleRequest Context
package org.apache.kafka.server.authorizer;

import java.net.InetAddress;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Request context interface that provides data from request header as well as connection
 * and authentication information to plugins.
 */
@InterfaceStability.Evolving
public interface AuthorizableRequestContext {    /**
     * Deletes all ACL bindings that match the provided filters.
     * <p>
     * This is an asynchronous API that enables the caller to avoid blocking during the update. Implementations of this
     * API can return completed futures using {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}
     * to process the update synchronously on the request thread.
     *
     * @param requestContext Request context if the ACL is being deleted by a broker to handle
     *        a client request to delete ACLs. This may be null if ACLs are deleted directly in ZooKeeper
     *        using AclCommand.
     * @param aclBindingFilters Filters to match ACL bindings that are to be deleted
     *
     * @return Delete result for each filter in the same order as in the input list.
     *         Each result indicates which ACL bindings were actually deleted as well as any
     *         bindings that matched but could not be deleted. Each result is returned as a
     *         CompletionStage that completes when the result is available.
     */
    List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters);

    /**
     * Returns nameACL ofbindings listenerwhich onmatch whichthe requestprovided was receivedfilter.
     */
    String listener();

    /**
     * @return Iterator for ACL bindings, which may be populated lazily.
     */
 Returns the security protocol for theIterable<AclBinding> acls(AclBindingFilter filter);
}


Request context will be provided to authorizers using a new interface AuthorizableRequestContext. The existing class org.apache.kafka.common.requests.RequestContext will implement this interface. New methods may be added to this interface in future, so mock implementations using this interface should adapt to these changes.

Code Block
languagejava
titleRequest Context
package org.apache.kafka.server.authorizer;

import java.net.InetAddress;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Request context interface that provides data from request header as well as connection
 * and authentication information to plugins.
 */
@InterfaceStability.Evolving
public interface AuthorizableRequestContext {listener on which request was received.
     */
    SecurityProtocol securityProtocol();

    /**
     * Returns authenticated principal for the connection on which request was received.
     */
    KafkaPrincipal principal();

    /**
     * Returns clientname IPof addresslistener fromon which request was sentreceived.
     */
    InetAddressString clientAddresslistener();

    /**
     * 16-bitReturns APIthe keysecurity of the request fromprotocol for the requestlistener header. See
     * https://kafka.apache.org/protocol#protocol_api_keys for request typeson which request was received.
     */
    intSecurityProtocol requestTypesecurityProtocol();

    /**
     * Returns authenticated principal for the requestconnection versionon fromwhich therequest requestwas headerreceived.
     */
    intKafkaPrincipal requestVersionprincipal();

    /**
     * Returns theclient clientIP idaddress from thewhich request was headersent.
     */
    StringInetAddress clientIdclientAddress();

    /**
     * Returns the correlation id from the request header 16-bit API key of the request from the request header. See
     * https://kafka.apache.org/protocol#protocol_api_keys for request types.
     */
    int correlationIdrequestType();
}

AuthorizerServerInfo provides runtime broker configuration to authorization plugins including broker id, cluster id and endpoint information. New methods may be added to this interface in future, so mock implementations using this interface should adapt to these changes.

Code Block
languagejava
titleBroker Runtime Config
package org.apache.kafka.server.authorizer;

import java.util.Collection;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.annotation.InterfaceStability;

/**
 * Runtime broker configuration metadata provided to authorizers during start up.
 */
@InterfaceStability.Evolving
public interface AuthorizerServerInfo {    /**
     * Returns the request version from the request header.
     */
    int requestVersion();

    /**
     * Returns the client id from the request header.
     */
    String clientId();

    /**
     * Returns the clustercorrelation metadataid forfrom the request header.
     */
    int correlationId();
}


AuthorizerServerInfo provides runtime broker configuration to authorization plugins including broker id, cluster id and endpoint information. New methods may be added to this interface in future, so mock implementations using this interface should adapt to these changes.

Code Block
languagejava
titleBroker Runtime Config
package org.apache.kafka.server.authorizer;

import java.util.Collection;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.annotation.InterfaceStability;

/**
 * Runtime broker configuration metadata provided to authorizers during start up.
 */
@InterfaceStability.Evolving
public interface AuthorizerServerInfo { broker running this authorizer including cluster id.
     */
    ClusterResource clusterResource();

    /**
     * Returns broker id. This may be a generated broker id if `broker.id` was not configured.
     */
    int brokerId();

    /**
     * Returns endpoints for all listeners including the advertised host and port to which
     * the listener is bound.
     */
    Collection<Endpoint> endpoints();

    /**
     * Returns thecluster inter-broker endpoint. This is one of the endpoints returned by {@link #endpoints()}metadata for the broker running this authorizer including cluster id.
     */
    Endpoint interBrokerEndpoint();
}

Endpoint is added as a common class so that it may be reused in several places in the code where we use this abstraction.

 ClusterResource clusterResource();

    /**
     * Returns broker id. This may be a generated broker id if `broker.id` was not configured.
     */
    int brokerId();

    /**
     * Returns endpoints for all listeners including the advertised host and port to which
     * the listener is bound.
     */
    Collection<Endpoint> endpoints();

    /**
     * Returns the inter-broker endpoint. This is one of the endpoints returned by {@link #endpoints()}.
     */
    Endpoint interBrokerEndpoint();
}


Endpoint is added as a common class so that it may be reused in several places in the code where we use this abstraction.

Code Block
languagejava
titleEndpoint
package org.apache.kafka.common;

import java.util.Objects;
import java.util.Objects;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Represents a broker endpoint.
 */

@InterfaceStability.Evolving
public class Endpoint {

    private final String
Code Block
languagejava
titleEndpoint
package org.apache.kafka.common;

import java.util.Objects;
import java.util.Objects;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Represents a broker endpoint.
 */

@InterfaceStability.Evolving
public class Endpoint {

    private final String listener;
    private final SecurityProtocol securityProtocol;
    private final String host;
    private final int port;

    public Endpoint(String listener, SecurityProtocol securityProtocol, String host, int port) {
        this.listener = listener;
        this.securityProtocol = securityProtocol;
        this.host = host;
        this.port = port;
    }

    /**
     * Returns the listener name of this endpoint.
     */
    public String listener() {
        return listener;
    }

private final   /**SecurityProtocol securityProtocol;
    private *final Returns the security protocol of this endpoint.
     */String host;
    private final int port;

    public SecurityProtocol securityProtocolEndpoint()String {
listener, SecurityProtocol securityProtocol, String host, int port) {
 return securityProtocol;
    }

  this.listener = /**
  listener;
   * Returns advertised host name ofthis.securityProtocol this= endpoint.securityProtocol;
     */
   this.host public= String host();
 {
       this.port return= hostport;
    }

    /**
     * Returns the portlistener to which the listener is boundname of this endpoint.
     */
    public intString portlistener() {
        return portlistener;
    }

    @Override
    public boolean equals(Object o) {/**
     * Returns the security protocol of this endpoint.
     */
   if (thispublic ==SecurityProtocol osecurityProtocol() {
            return truesecurityProtocol;
        }

        if (!(o instanceof Endpoint)) {/**
            return false;
        }
  * Returns advertised host name of this endpoint.
     */
   Endpoint thatpublic =String host(Endpoint) o;{
        return Objects.equals(this.listener, that.listener) &&host;
    }

    /**
    Objects.equals(this.securityProtocol, that.securityProtocol) &&
            Objects.equals(this.host, that.host) &&
      * Returns the port to which the listener is bound.
     */
    public int port() {
       this.port ==return that.port;

    }

    @Override
    public intboolean hashCodeequals(Object o) {
        returnif Objects.hash(listener, securityProtocol, host, port);
this == o) {
         }

   return @Overridetrue;
     public String toString() {}
        return "Endpoint(" + if (!(o instanceof Endpoint)) {
            "listener='" + listener + '\'' +
return false;
        }

        Endpoint ",that securityProtocol=" + securityProtocol +(Endpoint) o;
        return Objects.equals(this.listener, that.listener) &&
 ", host='" + host + '\'' +
     Objects.equals(this.securityProtocol, that.securityProtocol) &&
     ", port=" + port +
   Objects.equals(this.host, that.host) &&
       ')';
    }
}

Action  provides details of the action being authorized including resource and operation. Additional context including audit flag indicating authorization usage are also included, enabling access violation to be distinguished from resource filtering or optional ACLs.

Code Block
languagejava
titleAuthorizable Action
package org.apache.kafka.server.authorizer;

import java.util.Objects;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;

@InterfaceStability.Evolving
public class Action {

    private final ResourcePattern resourcePattern;
    private final AclOperation operation;
    private final int resourceReferenceCount;
    private final boolean logIfAllowed;
    private final boolean logIfDenied;

    public Action(AclOperation operation,
        this.port == that.port;

    }

    @Override
    public int hashCode() {
        return Objects.hash(listener, securityProtocol, host, port);
    }

    @Override
    public String toString() {
        return "Endpoint(" +
            "listener='" + listener + '\'' +
            ", securityProtocol=" + securityProtocol +
           ResourcePattern resourcePattern",
 host='" + host + '\'' +
            int resourceReferenceCount",
 port=" + port +
              boolean logIfAllowed,')';
    }
}



Action  provides details of the action being authorized including resource and operation. Additional context including audit flag indicating authorization usage are also included, enabling access violation to be distinguished from resource filtering or optional ACLs.


Code Block
languagejava
titleAuthorizable Action
package org.apache.kafka.server.authorizer;

import java.util.Objects;
import org.apache.kafka.common.acl.AclOperation;
import           boolean logIfDenied) {
        this.operation = operation;
        this.resourcePattern = resourcePattern;
        this.logIfAllowed = logIfAllowedorg.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;

@InterfaceStability.Evolving
public class Action {

    private final ResourcePattern resourcePattern;
    private final   this.logIfDenied = logIfDeniedAclOperation operation;
    private final int  this.resourceReferenceCount = resourceReferenceCount;
    }

private final   /**boolean logIfAllowed;
    private * Resource on which action is being performed.final boolean logIfDenied;

    public Action(AclOperation operation,
     */
    public ResourcePattern resourcePattern() {
        returnResourcePattern resourcePattern;,
     }

    /**
     * Operation being performed.
 int resourceReferenceCount,
   */
    public AclOperation operation() {
        returnboolean operation;logIfAllowed,
    }

    /**
     * Indicates if audit logs trackingboolean ALLOWEDlogIfDenied) access{
 should include this action if result is
 this.operation = operation;
  * ALLOWED. The flag is true ifthis.resourcePattern access= toresourcePattern;
 a resource is granted while processing the requestthis.logIfAllowed as= alogIfAllowed;
     * result of this authorization.logIfDenied The= flaglogIfDenied;
 is false only for requests used to describethis.resourceReferenceCount access= whereresourceReferenceCount;
    }

 * no operation on/**
 the resource is actually performed* basedResource on the authorization resultwhich action is being performed.
     */
    public booleanResourcePattern logIfAllowedresourcePattern() {
        return logIfAllowedresourcePattern;
    }

    /**
     * Operation Indicatesbeing ifperformed.
 audit logs tracking DENIED access*/
 should include this actionpublic ifAclOperation resultoperation() is{
     * DENIED. The flagreturn isoperation;
 true if access to a resource was explicitly requested and request }

    /**
     * isIndicates deniedif asaudit alogs resulttracking ofALLOWED thisaccess authorizationshould request. The flag is falseinclude this action if requestresult wasis
     * ALLOWED. The filteringflag outis authorizedtrue resources (e.g.if access to a resource subscribeis togranted regexwhile pattern).processing Thethe flagrequest isas alsoa
     * falseresult ifof this authorization. The flag is anfalse optionalonly authorizationfor whererequests anused alternativeto resourcedescribe authorizationaccess iswhere
     * no appliedoperation ifon thisthe fails (e.g. Cluster:Create which is subsequently overridden by Topic:Create)resource is actually performed based on the authorization result.
     */
    public boolean logIfDeniedlogIfAllowed() {
        return logIfDeniedlogIfAllowed;
    }

    /**
     * NumberIndicates ofif timesaudit thelogs resourcetracking beingDENIED authorizedaccess isshould referencedinclude withinthis theaction request.if For example, a singleresult is
     * request may reference `n` topic partitions of the same topicDENIED. BrokersThe willflag authorize the topic once
     * with `resourceReferenceCount=n`. Authorizers may include the count in audit logs.is true if access to a resource was explicitly requested and request
     */
 is denied as publica intresult resourceReferenceCount() {
        return resourceReferenceCount;of this authorization request. The flag is false if request was
    }

 * filtering out @Override
authorized resources (e.g. to publicsubscribe booleanto equals(Objectregex opattern) {. The flag is also
     * false if this is ifan (thisoptional ==authorization o)where {
an alternative resource authorization is
     * applied if returnthis true;
        }fails (e.g. Cluster:Create which is subsequently overridden by Topic:Create).
     */
   if (!(o instanceof Action)public boolean logIfDenied() {
            return falselogIfDenied;
    }

    }
/**
     * Number of Actiontimes that = (Action) o;
        return Objects.equals(this.resourcePattern, that.resourcePattern) &&
the resource being authorized is referenced within the request. For example, a single
     * request may reference `n` topic partitions  Objects.equals(this.operation, that.operation) &&
     of the same topic. Brokers will authorize the topic once
     * with this.resourceReferenceCount == that.resourceReferenceCount &&
       `resourceReferenceCount=n`. Authorizers may include the count in audit logs.
     this.logIfAllowed == that.logIfAllowed &&
     */
    public int resourceReferenceCount() {
       this.logIfDenied == that.logIfDeniedreturn resourceReferenceCount;

    }

    @Override
    public intboolean hashCodeequals(Object o) {
        returnif Objects.hash(resourcePattern, operation, resourceReferenceCount, logIfAllowed, logIfDenied);
    }

this == o) {
      @Override
    public String toString() {return true;
        return "Action(" +
}
        if (!(o instanceof Action)) {
  ", resourcePattern='" + resourcePattern + '\'' +
    return false;
       ", operation='" + operation + '\'' +
 }

        Action that = (Action) o;
     ", resourceReferenceCount='" + resourceReferenceCount + '\'' +return Objects.equals(this.resourcePattern, that.resourcePattern) &&
            ", logIfAllowed='" + logIfAllowed + '\'' +
Objects.equals(this.operation, that.operation) &&
            this.resourceReferenceCount == that.resourceReferenceCount &&
     ", logIfDenied='" + logIfDenied + '\'' +
 this.logIfAllowed == that.logIfAllowed &&
        ')';
    }
}

Authorize method returns individual allowed/denied results for every action.

Code Block
languagejava
titleAuthorizer Operation Results
package org.apache.kafka.server.authorizer;

public enum AuthorizationResult {this.logIfDenied == that.logIfDenied;

    }

    ALLOWED,@Override
    public int hashCode() DENIED
}

ACL create operation returns any exception from each ACL binding requested.

Code Block
languagejava
titleAuthorizer Operation Results
package org.apache.kafka.server.authorizer;

import java.util.Optional;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;

@InterfaceStability.Evolving
public class AclCreateResult {
    public static final AclCreateResult SUCCESS = new AclCreateResult();

    private final ApiException exception;

    private AclCreateResult() {{
        return Objects.hash(resourcePattern, operation, resourceReferenceCount, logIfAllowed, logIfDenied);
    }

    @Override
    public String toString() {
        return "Action(" +
            ", resourcePattern='" + resourcePattern + '\'' +
        this(null);
    }

    public AclCreateResult(ApiException exception) {", operation='" + operation + '\'' +
        this.exception = exception;
  ",  }

    /**resourceReferenceCount='" + resourceReferenceCount + '\'' +
     * Returns any exception during create. If exception is empty", the request has succeeded.
     */
    public Optional<ApiException> exception() {
logIfAllowed='" + logIfAllowed + '\'' +
           return exception", logIfDenied=='" null+ ? Optional.empty() : Optional.of(exception);logIfDenied + '\'' +
    }

    /**
    ')';
 * Returns true if the request failed.
     */
    public boolean failed() }
}


Authorize method returns individual allowed/denied results for every action.

Code Block
languagejava
titleAuthorizer Operation Results
package org.apache.kafka.server.authorizer;

public enum AuthorizationResult {
    ALLOWED,
    return exception != null;
    DENIED
}
}


ACL

...

create operation returns any exception from each ACL

...

binding requested.

Code Block
languagejava
titleDelete Authorizer Operation Results
package org.apache.kafka.server.authorizer;

import java.util.Collections;
import java.util.Collection;
 org.apache.kafka.server.authorizer;

import java.util.Optional;
import org.apache.kafka.common.aclannotation.AclBindingInterfaceStability;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;

@InterfaceStability.Evolving
public class AclDeleteResult {
    private final ApiException exception;
    private final Collection<AclBindingDeleteResult> aclBindingDeleteResults;
.errors.ApiException;

@InterfaceStability.Evolving
public class AclCreateResult {
    public static final AclCreateResult SUCCESS = new AclCreateResult();

    private final ApiException exception;

    private AclCreateResult() {
        this(null);
    }

    public AclCreateResult(ApiException exception) {
        this.exception = exception;
    }

    /**
     * Returns any exception during create. If exception is empty, the request has succeeded.
     */
    public AclDeleteResult(ApiExceptionOptional<ApiException> exception() {
        return exception == null ? this(CollectionsOptional.emptySetempty(),  : Optional.of(exception);
    }
}


ACL delete operation returns any exception from each ACL filter requested. Matching ACL bindings for each filter are returned along with any delete failure.

Code Block
languagejava
titleDelete Results
package org.apache.kafka.server.authorizer;

import java.util.Collections;
import java.util.Collection;
import java.util.Optional;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;

@InterfaceStability.Evolving
public class AclDeleteResult {
    private final ApiException exception;
    private final Collection<AclBindingDeleteResult> aclBindingDeleteResults;

    public AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults) {
        this(deleteResults, null);
    }

    private AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults, ApiException exception) {
        this.aclBindingDeleteResults = deleteResults;
        this.exception = exception;
    }

    /**
     * Returns any exception while attempting to match ACL filter to delete ACLs.
     */
    public Optional<ApiException>AclDeleteResult(ApiException exception() {
        return exception == null ? Optional.empty() : Optional.of(this(Collections.emptySet(), exception);
    }

    public AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults) /**{
     * Returns delete result for each matching ACL binding.
     */
this(deleteResults, null);
    }

    private AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults, ApiException exception) {
      public Collection<AclBindingDeleteResult> this.aclBindingDeleteResults() {
 = deleteResults;
        this.exception return= aclBindingDeleteResultsexception;
    }


    /**
     * Delete result for eachReturns any exception while attempting to match ACL binding that matched afilter to delete filterACLs.
     */
    public static class AclBindingDeleteResultOptional<ApiException> exception() {
         private final AclBinding aclBinding;
  return exception == null ? Optional.empty() : Optional.of(exception);
    }

    /**
  private final ApiException exception;

* Returns delete result for each matching ACL public AclBindingDeleteResult(AclBinding aclBinding) {
   binding.
     */
    public Collection<AclBindingDeleteResult> aclBindingDeleteResults() {
         this(aclBinding, null)return aclBindingDeleteResults;
    }


    }/**

     * Delete result publicfor AclBindingDeleteResult(AclBinding aclBinding, ApiException exception) {
   each ACL binding that matched a delete filter.
     */
    this.aclBindingpublic =static aclBinding;
class AclBindingDeleteResult {
        private final this.exception = exceptionAclBinding aclBinding;
        }

private final ApiException exception;

     /**
   public AclBindingDeleteResult(AclBinding aclBinding)  {
  * Returns ACL binding that matched the delete filter. {@link #deletedthis()} indicates if
aclBinding, null);
        }

    * the binding was deleted.
     public AclBindingDeleteResult(AclBinding aclBinding, ApiException exception) {
    */
        publicthis.aclBinding AclBinding= aclBinding();
 {
           this.exception return= aclBindingexception;
        }

        /**
         * Returns ACL binding that matched the delete filter. {@link #deleted()} indicates if
  * Returns any exception that resulted in failure* tothe deletebinding ACLwas bindingdeleted.
         */
        public Optional<ApiException>AclBinding exceptionaclBinding() {
            return exception == null ? Optional.empty() : Optional.of(exception)aclBinding;
        }

        /**
         * Returns any exception truethat ifresulted ACLin bindingfailure wasto deleted,delete falseACL otherwisebinding.
         */
        public booleanOptional<ApiException> deletedexception() {
            return exception == null ? Optional.empty() : Optional.of(exception);
        }
    }
}


Proposed Changes

Deprecate existing Scala Authorizer Trait

...