Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleJava Authorizer Interface
package org.apache.kafka.server.authorizer;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;

/**
 *
 * Pluggable authorizer interface for Kafka brokers.
 *
 * Startup sequence in brokers:
 * <ol>
 *   <li>Broker creates authorizer instance if configured in `authorizer.class.name`.</li>
 *   <li>Broker configures and starts authorizer instance. Authorizer implementation starts loading its metadata.</li>
 *   <li>Broker starts SocketServer to accept connections and process requests.</li>
 *   <li>For each listener, SocketServer waits for authorization metadata to be available in the
 *       authorizer before accepting connections. The future returned by {@link #start(AuthorizerServerInfo)}
 *       must return only when authorizer is ready to authorize requests on the listener.</li>
 *   <li>Broker accepts connections. For each connection, broker performs authentication and then accepts Kafka requests.
 *       For each request, broker invokes {@link #authorize(AuthorizableRequestContext, List)} to authorize
 *       actions performed by the request.</li>
 * </ol>
 *
 * Authorizer implementation class may optionally implement @{@link org.apache.kafka.common.Reconfigurable}
 * to enable dynamic reconfiguration without restarting the broker.
 * <p>
 * <b>Threading model:</b>
 * <ul>
 *   <li>All authorizer operations including authorization and ACL updates must be thread-safe.</li>
 *   <li>ACL update methods are asynchronous. Implementations with low update latency may return a
 *       completed future using {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}.
 *       This ensures that the request will be handled synchronously by the caller without using a
 *       purgatory to wait for the result. If ACL updates require remote communication which may block,
 *       return a future that is completed asynchronously when the remote operation completes. This enables
 *       the caller to process other requests on the request threads without blocking.</li>
 *   <li>Any threads or thread pools used for processing remote operations asynchronously can be started during
 *       {@link #start(AuthorizerServerInfo)}. These threads must be shutdown during {@link Authorizer#close()}.</li>
 * </ul>
 * </p>
 */
@InterfaceStability.Evolving
public interface Authorizer extends Configurable, Closeable {

    /**
     * Starts loading authorization metadata and returns futures that can be used to wait until
     * metadata for authorizing requests on each listener is available. Each listener will be
     * started only after its metadata is available and authorizer is ready to start authorizing
     * requests on that listener.
     *
     * @param serverInfo Metadata for the broker including broker id and listener endpoints
     * @return CompletionStage for each endpoint that completes when authorizer is ready to
     *         start authorizing requests on that listener.
     */
    Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo);

    /**
     * Authorizes the specified action. Additional metadata for the action is specified
     * in `requestContext`.
     * <p>
     * This @paramis requestContexta Requestsynchronous contextAPI includingdesigned requestfor type,use securitywith protocollocally andcached listenerACLs. name
Since this method is invoked *on @paramthe
 actions Actions being authorized including* resourcerequest andthread operationwhile forprocessing each action
request, implementations of this method *should @return List of authorization results for each action in the same order as the provided actionsavoid time-consuming
     * remote communication that may block request threads.
     */
    List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions);

    /** * @param requestContext Request context including request type, security protocol and listener name
     * @param actions CreatesActions newbeing ACLauthorized bindings.
including resource and operation for *each <p>action
     * This@return isList anof asynchronousauthorization APIresults thatfor enableseach theaction callerin tothe avoidsame blockingorder duringas the update. Implementations of thisprovided actions
     */
 API can return completedList<AuthorizationResult> futures using {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}authorize(AuthorizableRequestContext requestContext, List<Action> actions);

     /**
 to process the update synchronously* onCreates thenew requestACL threadbindings.
     * <p>
     * This is @paraman requestContextasynchronous RequestAPI contextthat ifenables the ACLcaller to isavoid beingblocking createdduring bythe aupdate. brokerImplementations toof handlethis
     * API can return completed futures using  a client request to {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}
     * to process the update synchronously on the request thread.
     *
     * @param requestContext Request context if the ACL is being created by a broker to handle
     *        a client request to create ACLs. This may be null if ACLs are created directly in ZooKeeper
     *        using AclCommand.
     * @param aclBindings ACL bindings to create
     *
     * @return Create result for each ACL binding in the same order as in the input list. Each result
     *         is returned as a CompletionStage that completes when the result is available.
     */
    List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings);

    /**
     * Deletes all ACL bindings that match the provided filters.
     * <p>
     * This is an asynchronous API that enables the caller to avoid blocking during the update. Implementations of this
     * API can return completed futures using {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}
     * to process the update synchronously on the request thread.
     *
     * @param requestContext Request context if the ACL is being deleted by a broker to handle
     *        a client request to delete ACLs. This may be null if ACLs are deleted directly in ZooKeeper
     *        using AclCommand.
     * @param aclBindingFilters Filters to match ACL bindings that are to be deleted
     *
     * @return Delete result for each filter in the same order as in the input list.
     *         Each result indicates which ACL bindings were actually deleted as well as any
     *         bindings that matched but could not be deleted. Each result is returned as a
     *         CompletionStage that completes when the result is available.
     */
    List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters);

    /**
     * Returns ACL bindings which match the provided filter.
     * <p>
     * @returnThis Iteratoris fora ACL bindings, which may be populated lazily.synchronous API designed for use with locally cached ACLs. This method is invoked on the request
     */
 thread while processing DescribeAcls requests and should avoid time-consuming remote communication that may
     Iterable<AclBinding> acls(AclBindingFilter filter);
}

* block request threads.
     *
     * @return Iterator for ACL bindings, which may be populated lazily.
     */
    Iterable<AclBinding> acls(AclBindingFilter filter);
}


Request context will be provided to authorizers Request context will be provided to authorizers using a new interface AuthorizableRequestContext. The existing class org.apache.kafka.common.requests.RequestContext will implement this interface. New methods may be added to this interface in future, so mock implementations using this interface should adapt to these changes.

Code Block
languagejava
titleRequest Context
package org.apache.kafka.server.authorizer;

import java.net.InetAddress;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Request context interface that provides data from request header as well as connection
 * and authentication information to plugins.
 */
@InterfaceStability.Evolving
public interface AuthorizableRequestContext {

    /**
     * Returns name of listener on which request was received.
     */
    String listenerlistenerName();

    /**
     * Returns the security protocol for the listener on which request was received.
     */
    SecurityProtocol securityProtocol();

    /**
     * Returns authenticated principal for the connection on which request was received.
     */
    KafkaPrincipal principal();

    /**
     * Returns client IP address from which request was sent.
     */
    InetAddress clientAddress();

    /**
     * 16-bit API key of the request from the request header. See
     * https://kafka.apache.org/protocol#protocol_api_keys for request types.
     */
    int requestType();

    /**
     * Returns the request version from the request header.
     */
    int requestVersion();

    /**
     * Returns the client id from the request header.
     */
    String clientId();

    /**
     * Returns the correlation id from the request header.
     */
    int correlationId();
}

...

Code Block
languagejava
titleEndpoint
package org.apache.kafka.common;

import java.util.Objects;
import java.util.ObjectsOptional;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Represents a broker endpoint.
 */

@InterfaceStability.Evolving
public class Endpoint {

    private final String listenerlistenerName;
    private final SecurityProtocol securityProtocol;
    private final String host;
    private final int port;

    public Endpoint(String listenerlistenerName, SecurityProtocol securityProtocol, String host, int port) {
        this.listenerlistenerName = listenerlistenerName;
        this.securityProtocol = securityProtocol;
        this.host = host;
        this.port = port;
    }

    /**
     * Returns the listener name of this endpoint.
 This is non-empty for */endpoints provided
    public String* listener() {
        return listenerto broker plugins, but may be empty when used in clients.
     */
    public Optional<String> listenerName() {
        return Optional.ofNullable(listenerName);
    }

    /**
     * Returns the security protocol of this endpoint.
     */
    public SecurityProtocol securityProtocol() {
        return securityProtocol;
    }

    /**
     * Returns advertised host name of this endpoint.
     */
    public String host() {
        return host;
    }

    /**
     * Returns the port to which the listener is bound.
     */
    public int port() {
        return port;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof Endpoint)) {
            return false;
        }

        Endpoint that = (Endpoint) o;
        return Objects.equals(this.listenerlistenerName, that.listenerlistenerName) &&
            Objects.equals(this.securityProtocol, that.securityProtocol) &&
            Objects.equals(this.host, that.host) &&
            this.port == that.port;

    }

    @Override
    public int hashCode() {
        return Objects.hash(listenerlistenerName, securityProtocol, host, port);
    }

    @Override
    public String toString() {
        return "Endpoint(" +
            "listenerlistenerName='" + listenerlistenerName + '\'' +
            ", securityProtocol=" + securityProtocol +
            ", host='" + host + '\'' +
            ", port=" + port +
            ')';
    }
}

...

Code Block
languagejava
titleDelete Results
package org.apache.kafka.server.authorizer;

import java.util.Collections;
import java.util.Collection;
import java.util.Optional;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;

@InterfaceStability.Evolving
public class AclDeleteResult {
    private final ApiException exception;
    private final Collection<AclBindingDeleteResult> aclBindingDeleteResults;

    public AclDeleteResult(ApiException exception) {
        this(Collections.emptySet(), exception);
    }

    public AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults) {
        this(deleteResults, null);
    }

    private AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults, ApiException exception) {
        this.aclBindingDeleteResults = deleteResults;
        this.exception = exception;
    }

    /**
     * Returns any exception while attempting to match ACL filter to delete ACLs.
     */
    public Optional<ApiException> exception() {
        return exception == null ? Optional.empty() : Optional.of(exception);
    }

    /**
     * Returns delete result for each matching ACL binding.
     */
    public Collection<AclBindingDeleteResult> aclBindingDeleteResults() {
        return aclBindingDeleteResults;
    }


    /**
     * Delete result for each ACL binding that matched a delete filter.
     */
    public static class AclBindingDeleteResult {
        private final AclBinding aclBinding;
        private final ApiException exception;

        public AclBindingDeleteResult(AclBinding aclBinding) {
            this(aclBinding, null);
        }

        public AclBindingDeleteResult(AclBinding aclBinding, ApiException exception) {
            this.aclBinding = aclBinding;
            this.exception = exception;
        }

        /**
         * Returns ACL binding that matched the delete filter. {@link #deleted()} indicates if
         * the binding was deleted.
         */
        public AclBinding aclBinding() {
            return aclBinding;
        }

        /**
         * Returns any exception that resulted in failure to delete ACL binding.
         */
        public Optional<ApiException> exception() {
            return exception == null ? Optional.empty() : Optional.of(exception);
        }
    }
}


Proposed Changes

Asynchronous update requests

kafka.server.KafkaApis will be updated to handle CreateAcls and DeleteAcls requests asynchronously using a purgatory. If Authorizer.createAcls or Authorizer.deleteAcls returns any ComplettionStage that is not complete, the request will be added to a purgatory and completed when all the stages complete. Authorizer implementations with low latency updates may continue to update synchronously and return a completed future. These requests will be completed in-line and will not be added to the purgatory.

Asynchronous updates are useful for Authorizer implementations that use external stores for ACLs, for example a database. Async handling of update requests will enable Kafka brokers to handle database outages without blocking request threads. As many databases now support async APIs (https://dev.mysql.com/doc/x-devapi-userguide/en/synchronous-vs-asynchronous-execution.html, https://blogs.oracle.com/java/jdbc-next:-a-new-asynchronous-api-for-connecting-to-a-database), async update API enables authorizers to take advantage of these APIs.

Purgatory metrics will be added for ACL updates, consistent with metrics from other purgatories. Two new metrics will be added:

  • kafka.server:type=DelayedOperationPurgatory,name=NumDelayedOperations,delayedOperation=acl-update
  • kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=acl-update

In addition to these metrics, existing request metrics for CreateAcls and DeleteAcls can be used to track the portion of time spent on async operations since local time is updated before the async wait and remote time is updated when async wait completes:

  • kafka.network:type=RequestMetrics,name=LocalTimeMs,request=CreateAcls
  • kafka.network:type=RequestMetrics,name=RemoteTimeMs,request=CreateAcls
  • kafka.network:type=RequestMetrics,name=LocalTimeMs,request=DeleteAcls
  • kafka.network:type=RequestMetrics,name=RemoteTimeMs,request=DeleteAcls

Deprecate existing Scala Authorizer Trait

...