You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »


Status

Current state: "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka supports pluggable authorization using the Scala trait kafka.security.auth.Authorizer. KIP-50 was accepted to replace this with a Java interface and Java ACL classes in the package org.apache.kafka.common in the 'clients' module. But this was never merged. Since KIP-50 was accepted, we have added new broker-side pluggable interfaces as Java interfaces in 'clients' in the package `org.apache.kafka.server`. These Java interfaces provide a better compatibility story than Scala traits, allowing us to evolve the API. 

This KIP is a replacement for KIP-50 to introduce a new Java interface for authorization. KIP-50 proposed to break compatibility because it was in the early days of adoption of authorization in Kafka. Since Kafka authorization has been widely adopted since then, we propose to deprecate, but continue to support the old interface to avoid breaking existing deployments during upgrade. We will also address the known limitations of the existing interface.

Goals

  1. Define a new Java interface for authorizer in 'clients' module in the package 'org.apache.kafka.server' similar to other server-side pluggable classes.
  2. KIP-4 has added ACL-related classes in the package org.apache.kafka.common (e.g. ResourcePattern and AccessControlEntry) to support ACL management using AdminClient. We will attempt to reuse these classes wherever possible.
  3. Deprecate but retain existing Scala authorizer API for backward compatibility to ensure that existing custom authorizers can be used with new brokers.
  4. Provide context about the request to authorizers to enable context-specific logic based on security protocol or listener to be applied to authorization.
  5. Provide additional context about the request including ApiKey and correlation id from the request header since these are useful for matching debug-level authorization logs with corresponding request logs.
  6. For ACL updates, provide request context including principal requesting the update and the listener on which request arrived to enable additional validation.
  7. Return individual responses for each access control entry update when multiple entries of a resource are updated. At the moment, we update the ZooKeeper node for a resource pattern multiple times when a request adds or removes multiple entries for a resource in a single update request. Since it is a common usage pattern to add or remove multiple access control entries while updating ACLs for a resource, batched updates will be supported to enable a single atomic update for each resource pattern.
  8. Provide authorization mode to authorizers to enable authorization logs to indicate attempts to access unauthorized resources. Kafka brokers log denied operations at INFO level and allowed operations at DEBUG level with the expectation that denied operations are rare and indicate erroneous or malicious use of the system. But we currently have several uses of Authorizer#authorize for filtering accessible resources or operations, for example for regex subscription. These fill up authorization logs with denied log entries, making these logs unusable for determining actual attempts to access resources by users who don’t have appropriate permissions. Authorization mode will enable the authorizer to determine the severity of denied access.
  9. For authorizers that don’t store metadata in ZooKeeper, ensure that authorizer metadata for each listener is available before starting up the listener. This enables different authorization metadata stores for different listeners.
  10. Rewrite out-of-the-box authorizer class SimpleAclAuthorizer to implement the new authorizer interface, making use of the features supported by the new API.
  11. Retain existing audit log entry format in SimpleAclAuthorizer to ensure that tools that parse these logs continue to work.
  12. Enable Authorizer implementations to make use of additional Kafka interfaces similar to other pluggable callbacks. Authorizers can implement org.apache.kafka.common.Reconfigurable  to support dynamic reconfiguration without restarting the broker. Authorizers implementing org.apache.kafka.common.ClusterResourceListener will be provided cluster id which may be included in logs or used to support centralized ACL storage.

Public Interfaces

Authorizer Configuration

A new configuration option `authorizer.class` will be introduced to configure a broker authorizer using the new Java interface org.apache.kafka.server.authorizer.Authorizer . The existing config `authorizer.class.name` will be deprecated, but will continue to support authorizers using the existing Scala trait kafka.security.auth.Authorizer.

New configuration option:

  • Name: authorizer.class
  • Type: CLASS

Authorizer API

The new Java Authorizer interface and its supporting classes are shown below:

Java Authorizer Interface
package org.apache.kafka.server.authorizer;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.KafkaRequestContext;

/**
 *
 * Pluggable authorizer interface for Kafka brokers.
 *
 * Startup sequence in brokers:
 * <ol>
 *   <li>Broker creates authorizer instance if configured in `authorizer.class` or `authorizer.class.name`.</li>
 *   <li>Broker configures and starts authorizer instance. Authorizer implementation starts loading its metadata.</li>
 *   <li>Broker starts SocketServer to accept connections and process requests.</li>
 *   <li>For each listener, SocketServer waits for authorization metadata to be available in the
 *       authorizer before accepting connections. The future returned by {@link #start(String, SecurityProtocol)}
 *       must return only when authorizer is ready to authorize requests on the listener.</li>
 *   <li>Broker accepts connections. For each connection, broker performs authentication and then accepts Kafka requests.
 *       For each request, broker invokes {@link #authorize(KafkaRequestContext, List)} to authorize
 *       actions performed by the request.</li>
 * </ol>
 *
 * Authorizer implementation class may optionally implement the following interfaces:
 * <ul>
 *   <li>@{@link org.apache.kafka.common.Reconfigurable} to enable dynamic reconfiguration
 *   without restarting the broker.</li>
 *   <li>{@link org.apache.kafka.common.ClusterResourceListener} to obtain cluster id</li>
 * </ul>
 */
@InterfaceStability.Evolving
public interface Authorizer extends Configurable, Closeable {

    /**
     * Starts loading authorization metadata and returns futures that can be used to wait until
     * metadata for authorizing requests on each listener is available. Each listener will be
     * started only after its metadata is available and authorizer is ready to start authorizing
     * requests on that listener.
     *
     * @param listeners Listener names with their security protocols
     * @return CompletableFutures for each listener that completes when authorizer is ready to
     *         start authorizing requests on that listener. Returned map contains one future
     *         for each listener name in the input `listeners` map.
     */
    Map<String, CompletableFuture<Void>> start(Map<String, SecurityProtocol> listeners);

    /**
     * Authorizes the specified action. Additional metadata for the action is specified
     * in `requestContext`.
     *
     * @param requestContext Request context including request type, security protocol and listener name
     * @param actions Actions being authorized including resource and operation for each action
     * @return List of authorization results for each action in the same order as the provided actions
     */
    List<AuthorizationResult> authorize(KafkaRequestContext requestContext, List<Action> actions);

    /**
     * Creates a new ACL binding.
     *
     * @param requestContext Request context if the ACL is being created by a broker to handle
     *        a client request to create ACLs. This may be null if ACLs are created directly in ZooKeeper
     *        using AclCommand.
     * @param resourcePattern Resource pattern for which ACLs are being added
     * @param accessControlEntries List of access control entries to add for resource
     *
     * @return Create result for each access control entry in the same order as in the input list
     */
    List<AclCreateResult> createAcls(KafkaRequestContext requestContext,
                                     ResourcePattern resourcePattern,
                                     List<AccessControlEntry> accessControlEntries);

    /**
     * Deletes the specified ACL binding.
     *
     * @param requestContext Request context if the ACL is being deleted by a broker to handle
     *        a client request to delete ACLs. This may be null if ACLs are deleted directly in ZooKeeper
     *        using AclCommand.
     * @param resourcePattern Resource pattern for which ACLs are being removed
     * @param accessControlEntries Set of access control entries to remove for resource
     *
     * @return Delete result for each access control entry in the same order as in the input list.
     *         Each result indicates if the entry was actually deleted.
     */
    List<AclDeleteResult> deleteAcls(KafkaRequestContext requestContext,
                                     ResourcePattern resourcePattern,
                                     List<AccessControlEntry> accessControlEntries);

    /**
     * Deletes all ACL bindings for the specified resource pattern. Only ACLs explicitly configured
     * with this resource pattern are deleted, no matching is performed.
     *
     * @param requestContext Request context if the ACLs are being deleted by a broker to handle
     *        a client request to delete ACLs. This may be null if ACLs are deleted directly in ZooKeeper
     *        using AclCommand.
     * @param resourcePattern Resource pattern whose ACLs are deleted
     * @return true if at least one ACL binding was deleted, false otherwise
     */
    boolean deleteAcls(KafkaRequestContext requestContext, ResourcePattern resourcePattern);

    /**
     * Returns all ACL bindings.
     *
     * @return Access control entries of all ACL bindings grouped by resource pattern.
     */
    Map<ResourcePattern, Set<AccessControlEntry>> acls();

    /**
     * Returns access control entries defined for the specified resource pattern.
     *
     * @param resourcePattern Resource pattern for which ACLs are returned. No matching is performed,
     *        only ACLs defined for the specified pattern are returned.
     *
     * @return ACL bindings for resource pattern.
     */
    Set<AccessControlEntry> acls(ResourcePattern resourcePattern);
}


Request context will provided to authorizers using a new interface KafkaRequestContext. The existing class org.apache.kafka.common.requests.RequestContext will implement this interface.

Request Context
package org.apache.kafka.common;

import java.net.InetAddress;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;

/**
 * Request context interface that provides data from request header as well as connection
 * and authentication information to plugins.
 */
@InterfaceStability.Evolving
public interface KafkaRequestContext {

    /**
     * Returns name of listener on which request was received.
     */
    String listenerName();

    /**
     * Returns the security protocol for the listener on which request was received.
     */
    SecurityProtocol securityProtocol();

    /**
     * Returns authenticated principal for the connection on which request was received.
     */
    KafkaPrincipal principal();

    /**
     * Returns client IP address from which request was sent.
     */
    InetAddress clientAddress();

    /**
     * Returns the request type (ApiKey) from the request header. For fetch requests,
     * the metrics names FetchFollower and FetchConsumer will be used to distinguish between
     * replica fetch requests and client fetch requests.
     */
    String requestType();

    /**
     * Returns the request version from the request header.
     */
    int requestVersion();

    /**
     * Returns the client id from the request header.
     */
    String clientId();

    /**
     * Returns the correlation id from the request header.
     */
    int correlationId();
}


Action  provides details of the action being authorized including resource and operation. Additional context including authorization mode are also included, enabling access violation to be distinguished from resource filtering or optional ACLs.


Authorizable Action
package org.apache.kafka.server.authorizer;

import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;

@InterfaceStability.Evolving
public class Action {

    private final ResourcePattern resourcePattern;
    private final AclOperation operation;
    private final AuthorizationMode authorizationMode;
    private final int count;

    public Action(AclOperation operation,
                  ResourceType resourceType,
                  String resourceName,
                  AuthorizationMode authorizationMode,
                  int count) {
        this.operation = operation;
        this.resourcePattern = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
        this.authorizationMode = authorizationMode;
        this.count = count;
    }

    /**
     * Resource on which action is being performed.
     */
    public ResourcePattern resourcePattern() {
        return resourcePattern;
    }

    /**
     * Operation being performed.
     */
    public AclOperation operation() {
        return operation;
    }

    /**
     * Authorization mode to enable authorization logs to distinguish between attempts
     * to access unauthorized resources and other filtering operations performed by the broker.
     */
    public AuthorizationMode authorizationMode() {
        return authorizationMode;
    }

    /**
     * Number of times the authorization result is used. For example, a single topic
     * authorization result may be used with `count` partitions of the topic within a single
     * request.
     */
    public int count() {
        return count;
    }


Authorization Mode:

Authorization Mode
package org.apache.kafka.server.authorizer;

public enum AuthorizationMode {
    /**
     * Access was requested to resource. If authorization result is ALLOWED, access is granted to
     * the resource to perform the request. If DENIED, request is failed with authorization failure.
     */
    MANDATORY,

    /**
     * Access was requested to resource. If authorization result is ALLOWED, access is granted to
     * the resource to perform the request. If DENIED, alternative authorization rules are applied
     * to determine if access is allowed.
     */
    OPTIONAL,

    /**
     * Access was requested to authorized resources (e.g. to subscribe to regex pattern).
     * Request is performed on resources whose authorization result is ALLOWED and the rest of
     * the resources are filtered out.
     */
    FILTER,

    /**
     * Request to list authorized operations. No access is actually performed by this request
     * based on the authorization result.
     */
    LIST_AUTHORIZED
}


Authorize method returns individual allowed/denied results for every action. ACL create and delete operations will return any exceptions from each access control entry requested.

Authorizer Operation Results
package org.apache.kafka.server.authorizer;

public enum AuthorizationResult {
    ALLOWED,
    DENIED
}

import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Evolving
public class AclCreateResult {
    private final Throwable exception;

    public AclCreateResult() {
        this(null);
    }

    public AclCreateResult(Throwable exception) {
        this.exception = exception;
    }

    /**
     * Returns any exception during create. If exception is null, the request has succeeded.
     */
    public Throwable exception() {
        return exception;
    }

    /**
     * Returns true if the request failed.
     */
    public boolean failed() {
        return exception != null;
    }
}

import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Evolving
public class AclDeleteResult {
    private final Throwable exception;
    private final boolean deleted;

    public AclDeleteResult(Throwable exception) {
        this(false, exception);
    }

    public AclDeleteResult(boolean deleted) {
        this(deleted, null);
    }

    private AclDeleteResult(boolean deleted, Throwable exception) {
        this.deleted = deleted;
        this.exception = exception;
    }

    /**
     * Returns any exception while attempting to delete an ACL.
     */
    public Throwable exception() {
        return exception;
    }

    /**
     * Returns true if request has failed with an exception. {@link #deleted()} indicates if
     * an ACL was deleted.
     */
    public boolean failed() {
        return exception != null;
    }

    /**
     * Returns true if a delete was performed.
     */
    public boolean deleted() {
        return deleted;
    }
}



Proposed Changes

Deprecate existing Scala Authorizer

kafka.security.auth.Authorizer will be deprecated along with all the supporting Scala classes including Resource, Operations and ResourceTypes. A new AuthorizerWrapper class will be introduced to wrap implementations using the Scala trait into the new Java interface. All usage of Authorizer (e.g. in KafkaApis) will be replaced with the new authorizer interface.

SimpleAclAuthorizer

SimpleAclAuthorizer will be updated to implement the new interface, making use of the additional request context available to improve authorization logging. This enables the authorizer to be used with the new config 'authorizer.class'. SimpleAclAuthorizer will remain a Scala class in `core`.  SimpleAclAuthorizer will continue to implement the old Scala Authorizer trait so that it can continue to be used with the old config 'authorizer.class.name', Internally, it will be handled as the new interface without a wrapper regardless of which config was used.

Optional Interfaces

ClusterResourceListener

kafka.server.KafkaServer will be updated to provide cluster-id to the configured authorizer if it implements org.apache.kafka.common.ClusterResourceListener interface. This will enable the authorizer to include cluster-id in logs and to use centralized ACLs if required.

Reconfigurable

kafka.server.DynamicBrokerConfig will be updated to support dynamic update of authorizers which implement org.apache.kafka.common.Reconfigurable. Authorizer implementations can react to dynamic updates of any of its configs including custom configs, avoiding broker restarts to update configs.

Compatibility, Deprecation, and Migration Plan

What impact (if any) will there be on existing users?

Existing authorizer interfaces and classes are being deprecated, but not removed. We will continue to support the old config with the old API to ensure that existing users are not impacted.

If we are changing behavior how will we phase out the older behavior?

We are deprecating the existing config `authorizer.class.name` and the existing Scala authorizer API. These will be removed in a future release, but will continue to be supported for backward compatibility until then. Until the old authorizer is removed, no config changes are required during upgrade.

Test Plan

All the existing integration and system tests will be updated to use the new config and the new authorization class. Unit tests will be added for testing the new methods and parameters being introduced in this KIP. An additional integration test will be added to test authorizers using the old Scala API.  

Rejected Alternatives

Description of Authorizer as proposed in KIP-50

KIP-50 proposes to return a textual description of the authorizer that can be used in tools like AclCommand. Since we don’t support returning a description using AdminClient and none of the other pluggable APIs have similar support, this KIP does not add a method to return authorizer description.

Separate authorizers for each listener

In some environments, authorization decisions may be dependent on the security protocol used by the client or the listener on which the request was received. We have listener prefixed configs to enable independent listener-specific configs for authentication etc. But since authorizers tend to cache a lot of metadata and need to watch for changes in metadata, a single shared instance works better for authorization. This KIP proposes a single authorizer that can use listener and security protocol provided in the authorization context to include listener-specific authorization logic.



  • No labels