Code Block
titleJava Authorizer Interface
package org.apache.kafka.server.authorizer;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.KafkaRequestContext;
import org.apache.kafka.server.BrokerInfoServerInfo;
import org.apache.kafka.server.BrokerInfoServerInfo.Endpoint;

 * Pluggable authorizer interface for Kafka brokers.
 * Startup sequence in brokers:
 * <ol>
 *   <li>Broker creates authorizer instance if configured in ``.</li>
 *   <li>Broker configures and starts authorizer instance. Authorizer implementation starts loading its metadata.</li>
 *   <li>Broker starts SocketServer to accept connections and process requests.</li>
 *   <li>For each listener, SocketServer waits for authorization metadata to be available in the
 *       authorizer before accepting connections. The future returned by {@link #start(BrokerInfoServerInfo)}
 *       must return only when authorizer is ready to authorize requests on the listener.</li>
 *   <li>Broker accepts connections. For each connection, broker performs authentication and then accepts Kafka requests.
 *       For each request, broker invokes {@link #authorize(KafkaRequestContext, List)} to authorize
 *       actions performed by the request.</li>
 * </ol>
 * Authorizer implementation class may optionally implement @{@link org.apache.kafka.common.Reconfigurable}
 * to enable dynamic reconfiguration without restarting the broker.
 * <p>
 * <b>Thread safety:</b> All authorizer operations including authorization and ACL updates must be thread-safe.
 * </p>
public interface Authorizer extends Configurable, Closeable {

     * Starts loading authorization metadata and returns futures that can be used to wait until
     * metadata for authorizing requests on each listener is available. Each listener will be
     * started only after its metadata is available and authorizer is ready to start authorizing
     * requests on that listener.
     * @param brokerInfoserverInfo Metadata for the broker including broker id and listener endpoints
     * @return CompletableFutures for each endpoint that completes when authorizer is ready to
     *         start authorizing requests on that listener.
    Map<Endpoint, CompletableFuture<Void>> start(BrokerInfoServerInfo brokerInfoserverInfo);

     * Authorizes the specified action. Additional metadata for the action is specified
     * in `requestContext`.
     * @param requestContext Request context including request type, security protocol and listener name
     * @param actions Actions being authorized including resource and operation for each action
     * @return List of authorization results for each action in the same order as the provided actions
    List<AuthorizationResult> authorize(KafkaRequestContext requestContext, List<Action> actions);

     * Creates new ACL bindings.
     * @param requestContext Request context if the ACL is being created by a broker to handle
     *        a client request to create ACLs. This may be null if ACLs are created directly in ZooKeeper
     *        using AclCommand.
     * @param aclBindings ACL bindings to create
     * @return Create result for each ACL binding in the same order as in the input list
    List<AclCreateResult> createAcls(KafkaRequestContext requestContext, List<AclBinding> aclBindings);

     * Deletes all ACL bindings that match the provided filters.
     * @param requestContext Request context if the ACL is being deleted by a broker to handle
     *        a client request to delete ACLs. This may be null if ACLs are deleted directly in ZooKeeper
     *        using AclCommand.
     * @param aclBindingFilters Filters to match ACL bindings that are to be deleted
     * @return Delete result for each filter in the same order as in the input list.
     *         Each result indicates which ACL bindings were actually deleted as well as any
     *         bindings that matched but could not be deleted.
    List<AclDeleteResult> deleteAcls(KafkaRequestContext requestContext, List<AclBindingFilter> aclBindingFilters);

     * Returns ACL bindings which match the provided filter.
     * @return Iterator for ACL bindings, which may be populated lazily.
    Iterable<AclBinding> acls(AclBindingFilter filter);


Code Block
titleRequest Context
package org.apache.kafka.common;

import org.apache.kafka.common.annotation.InterfaceStability;

 * Request context interface that provides data from request header as well as connection
 * and authentication information to plugins.
public interface KafkaRequestContext {

     * Returns name of listener on which request was received.
    String listener();

     * Returns the security protocol for the listener on which request was received.
    SecurityProtocol securityProtocol();

     * Returns authenticated principal for the connection on which request was received.
    KafkaPrincipal principal();

     * Returns client IP address from which request was sent.
    InetAddress clientAddress();

     * 16-bit API key of the request from the request header. See
     * for request types.
    int requestType();

     * Returns a name for the request type. For fetch requests, the metrics names
     * FetchFollower and FetchConsumer will be used to distinguish between
     * replica fetch requests and client fetch requests.
    String requestName();

     * Returns the request version from the request header.
    int requestVersion();

     * Returns the client id from the request header.
    String clientId();

     * Returns the correlation id from the request header.
    int correlationId();

BrokerInfo ServerInfo provides runtime broker configuration to authorization plugins including broker id, cluster id and endpoint information.

Code Block
titleBroker Runtime Config
package org.apache.kafka.server;

import java.util.Collection;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.annotation.InterfaceStability;

 * Runtime broker configuration metadata for broker-side plugins.
public interface BrokerInfoServerInfo {

    interface Endpoint {

        String listener();

        SecurityProtocol securityProtocol();

        String host();

        int port();

    ClusterResource clusterResource();

    int brokerId();

    Collection<Endpoint> endpoints();

    Endpoint interBrokerEndpoint();


Code Block
titleAuthorizable Action
package org.apache.kafka.server.authorizer;

import java.util.Objects;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;

public class Action {

    private final ResourcePattern resourcePattern;
    private final AclOperation operation;
    private final int resourceReferenceCount;
    private final AuditFlagboolean auditFlaglogIfAllowed;
    private final intboolean resourceReferenceCountlogIfDenied;

    public Action(AclOperation operation,
        ResourceType  resourceType,
        StringResourcePattern resourceNameresourcePattern,
         AuditFlag auditFlag,
        int resourceReferenceCount) {,
        this.operation = operation;
        this.resourcePattern = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
boolean logIfAllowed,
            this.auditFlag = auditFlag     boolean logIfDenied) {
        this.operation = operation;
        this.resourcePattern = resourcePattern;
        this.logIfAllowed = logIfAllowed;
        this.logIfDenied = logIfDenied;
        this.resourceReferenceCount = resourceReferenceCount;

     * Resource on which action is being performed.
    public ResourcePattern resourcePattern() {
        return resourcePattern;

     * Operation being performed.
    public AclOperation operation() {
        return operation;

     * AuthorizationIndicates usageif flagaudit to enable authorization logs to distinguish between attemptslogs tracking ALLOWED access should include this action if result is
     * ALLOWED. toThe accessflag unauthorizedis resourcestrue andif otheraccess filteringto operationsa performedresource byis thegranted broker.
while processing the request as */a
     public* AuditFlagresult auditFlag() {
        return auditFlag;

of this authorization. The flag is false only for requests used to describe access where
     * Numberno ofoperation timeson the resource beingis authorizedactually isperformed referencedbased withinon the authorization requestresult.
 For example, a single */
    public *boolean request may reference `n` topic partitions of the same topic. Brokers will authorize the topic oncelogIfAllowed() {
        return logIfAllowed;

 with `resourceReferenceCount=n`. Authorizers may include* theIndicates countif in audit logs.
 tracking DENIED access should */
include this action if publicresult intis
 resourceReferenceCount() {
   * DENIED. The flag is returntrue resourceReferenceCount;
if access to a }

resource was explicitly requested and @Overriderequest
    public boolean* equals(Object o) {
        if (this == o) {is denied as a result of this authorization request. The flag is false if request was
     * filtering out authorized resources (e.g. to returnsubscribe true;
to regex pattern). The flag is also
   * false if this is if (!(o instanceof Action)) {
    an optional authorization where an alternative resource authorization is
     * applied if returnthis false;

fails (e.g. Cluster:Create which is subsequently overridden by Topic:Create).
   Action thatpublic =boolean logIfDenied(Action) o;{
        return Objects.equals(this.resourcePattern, that.resourcePattern) &&

      Objects.equals(this.operation, that.operation) &&
            Objects.equals(this.auditFlag, that.auditFlag) &&* Number of times the resource being authorized is referenced within the request. For example, a single
     * request may reference `n` topic  Objects.equals(this.resourceReferenceCount, that.resourceReferenceCount);


    @Overridepartitions of the same topic. Brokers will authorize the topic once
     * with `resourceReferenceCount=n`. Authorizers may include the count in audit logs.
    public int hashCoderesourceReferenceCount() {
        return Objects.hash(resourcePattern, operation, auditFlag)resourceReferenceCount;

    public Stringboolean toStringequals(Object o) {
        return "Action(" +if (this == o) {
            ", resourcePattern='" + resourcePattern + '\'' +
return true;
         ", operation='" + operation + '\'' +
if (!(o instanceof Action)) {
            return false;
  ", auditFlag='" + auditFlag + '\'' + }

        Action that =  ", resourceReferenceCount='" + resourceReferenceCount + '\'' +

Audit flag provides additional context for audit logging:

Code Block
titleAudit Flag
package org.apache.kafka.server.authorizer;

public enum AuditFlag {
     * Access was requested to resource. If authorization result is ALLOWED, access is granted to
     * the resource to perform the request. If DENY, request is failed with authorization failure.
     * <p>
     * Audit logs tracking ALLOWED access should include this if result is ALLOWED.
     * Audit logs tracking DENIED access should include this if result is DENIED.
     * </p>

     * Access was requested to resource. If authorization result is ALLOWED, access is granted to
     * the resource to perform the request. If DENY, alternative authorization rules are applied
     * to determine if access is allowed.
     * <p>
     * For example, topic creation is allowed if user has Cluster:Create
     * permission to create any topic or the fine-grained Topic:Create permission to create topics
     * of the requested name. Cluster:Create is an optional ACL in this case.
     * </p><p>
     * Audit logs tracking ALLOWED access should include this if result is ALLOWED.
     * Audit logs tracking DENIED access can omit this if result is DENIED, since an alternative
     * authorization is used to determine access.
     * </p>

     * Access was requested to authorized resources (e.g. to subscribe to regex pattern).
     * Request is performed on resources whose authorization result is ALLOWED and the rest of
     * the resources are filtered out.
     * <p>
     * Audit logs tracking ALLOWED access should include this if result is ALLOWED.
     * Audit logs tracking DENIED access can omit this if result is DENIED since access was not
     * actually requested for the specified resource and it is filtered out.
     * </p>

     * Request to list authorized operations. No access is actually performed by this request
     * based on the authorization result.
     * <p>
     * Audit logs tracking ALLOWED/DENIED access can omit these since no access is performed
     * as a result of this.
     * </p>

(Action) o;
        return Objects.equals(this.resourcePattern, that.resourcePattern) &&
            Objects.equals(this.operation, that.operation) &&
            this.resourceReferenceCount == that.resourceReferenceCount &&
            this.logIfAllowed == that.logIfAllowed &&
            this.logIfDenied == that.logIfDenied;


    public int hashCode() {
        return Objects.hash(resourcePattern, operation, resourceReferenceCount, logIfAllowed, logIfDenied);

    public String toString() {
        return "Action(" +
            ", resourcePattern='" + resourcePattern + '\'' +
            ", operation='" + operation + '\'' +
            ", resourceReferenceCount='" + resourceReferenceCount + '\'' +
            ", logIfAllowed='" + logIfAllowed + '\'' +
            ", logIfDenied='" + logIfDenied + '\'' +

Authorize method returns individual allowed/denied results for every action. ACL create and delete operations will return any exceptions from each access control entry requested.

Code Block
titleAuthorizer Operation Results
package org.apache.kafka.server.authorizer;

public enum AuthorizationResult {

ACL create operation returns any exception from each ACL binding requested.

Code Block
titleAuthorizer Operation Results
package org.apache.kafka.server.authorizer;

import org.apache.kafka.common.annotation.InterfaceStability;
public class AclCreateResult {
    private final Throwable exception;

    public AclCreateResult() {

    public AclCreateResult(Throwable exception) {
        this.exception = exception;

     * Returns any exception during create. If exception is null, the request has succeeded.
    public Throwable exception() {
        return exception;

     * Returns true if the request failed.
    public boolean failed() {
        return exception != null;

ACL delete operation returns any exception from each ACL filter requested. Matching ACL bindings for each filter are returned along with any delete failure.

Code Block
titleDelete Results
package org.apache.kafka.server.authorizer;

import java.util.Collections;
import java.util.Collection;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.errors.ApiException;

public class AclDeleteResult {
    private final ApiException exception;
    private final Collection<DeletionResult> deletionResults;

    public AclDeleteResult(ApiException exception) {
        this(Collections.emptySet(), exception);

    public AclDeleteResult(Collection<DeletionResult> deleteResults) {
        this(deleteResults, null);

    private AclDeleteResult(Collection<DeletionResult> deleteResults, ApiException exception) {
        this.deletionResults = deleteResults;
        this.exception = exception;

     * Returns any exception while attempting to match ACL filter to delete ACLs.
    public ApiException exception() {
        return exception;

     * Returns delete result for each matching ACL binding.
    public Collection<DeletionResult> deletionResults() {
        return deletionResults;

     * Delete result for each ACL binding that matched a delete filter.
    public static class DeletionResult {
        private final AclBinding aclBinding;
        private final ApiException exception;

        public DeletionResult(AclBinding aclBinding) {
            this(aclBinding, null);

        public DeletionResult(AclBinding aclBinding, ApiException exception) {
            this.aclBinding = aclBinding;
            this.exception = exception;

         * Returns ACL binding that matched the delete filter. {@link #deleted()} indicates if
         * the binding was deleted.
        public AclBinding aclBinding() {
            return aclBinding;

         * Returns exception that resulted in failure to delete ACL binding.
        public ApiException exception() {
            return exception;

         * Returns true if ACL binding was deleted, false otherwise.
        public boolean deleted() {
            return exception == null;
