Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka authorizer is agnostic of principal types it supports, so are the acls CRUD methods in Authorizer interface. The intent behind is to keep Kafka principal types pluggable, which is really great. However, this leads to Acls CRUD methods not performing any check on validity of acls, as they are not aware of what principal types Authorizer implementation supports. This opens up space for lots of user errors, KAFKA-3097 is an instance.

Public Interfaces

...

Move following interfaces/ classes to package, org.apache.kafka.common.security.auth.

  1. Authorizer
  2. Acl
  3. Operation
  4. PermissionType
  5. Resource
  6. ResourceType
  7. KafkaPrincipal
  8. Session

...

Add following public methods to Authorizer interface.

Code Block
/**
* description of authorizer implementation, like, valid principal types.
* @return description of authorizer implementation.
*/
public String description()

...

Add following Exceptions to org.apache.kafka.common.errors.

InvalidAclException

Code Block
/**
 * Throw when an invalid Acl is being added or removed.
 */
public class InvalidAclException extends ApiException {

    private static final long serialVersionUID = 1L;

    public InvalidAclException(String message) {
        super(message);
    }

    public InvalidAclException(String message, Throwable cause) {
        super(message, cause);
    }

}

InvalidOperationException

Code Block
/**
 * Throw when an invalid operation is being performed on a resource.
 */
public class InvalidOperationException extends ApiException {

    private static final long serialVersionUID = 1L;

    public InvalidOperationException(String message) {
        super(message);
    }

    public InvalidOperationException(String message, Throwable cause) {
        super(message, cause);
    }

}

InvalidPrincipalException

Code Block
/**
 * Throw when an invalid principal is provided by user.
 */
public class InvalidPrincipalException extends ApiException {

    private static final long serialVersionUID = 1L;

    public InvalidPrincipalException(String message) {
        super(message);
    }

    public InvalidPrincipalException(String message, Throwable cause) {
        super(message, cause);
    }

}

InvalidResourceException

Code Block
/**
 * Throw when an invalid resource is accessed.
 */
public class InvalidResourceException extends ApiException {

    private static final long serialVersionUID = 1L;

    public InvalidResourceException(String message) {
        super(message);
    }

    public InvalidResourceException(String message, Throwable cause) {
        super(message, cause);
    }

}

 

...

Update Authorizer interface to get rid of getter naming convention.

Code Block
public interface Authorizer extends Configurable {

    /**
     * @param session   The session being authenticated.
     * @param operation Type of operation client is trying to perform on resource.
     * @param resource  Resource the client is trying to access.
     * @return
     *
     * @throws org.apache.kafka.common.errors.InvalidResourceException if resource does not exist
     * @throws org.apache.kafka.common.errors.InvalidOperationException if requested operation is not
     *          supported on the resource
     */
    public boolean authorize(Session session, Operation operation, Resource resource);

    /**
     * implementation specific description, like, supported principal types.
     *
     * @return implementation specific description.
     */
    public String description();

    /**
     * add the acls to resource, this is an additive operation so existing acls will not be overwritten, instead these new
     * acls will be added to existing acls.
     *
     * @param acls     set of acls to add to existing acls
     * @param resource the resource to which these acls should be attached.
     *
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to add acls for the resource
     * @throws org.apache.kafka.common.errors.InvalidResourceException if resource does not exist
     * @throws org.apache.kafka.common.errors.InvalidAclException if an invalid acl is being added
     */
    public void addAcls(Set<Acl> acls, Resource resource);

    /**
     * remove these acls from the resource.
     *
     * @param acls     set of acls to be removed.
     * @param resource resource from which the acls should be removed.
     * @return true if some acl got removed, false if no acl was removed.
     *
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to remove acls for the resource
     * @throws org.apache.kafka.common.errors.InvalidResourceException if resource does not exist
     * @throws org.apache.kafka.common.errors.InvalidAclException if an invalid acl is being removed
     */
    public boolean removeAcls(Set<Acl> acls, Resource resource);

    /**
     * remove a resource along with all of its acls from acl store.
     *
     * @param resource
     * @return
     *
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to remove acls for the resource
     * @throws org.apache.kafka.common.errors.InvalidResourceException if resource does not exist
     */
    public boolean removeAcls(Resource resource);

    /**
     * get set of acls for this resource
     *
     * @param resource
     * @return empty set if no acls are found, otherwise the acls for the resource.
     *
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to access acls for the resource
     * @throws org.apache.kafka.common.errors.InvalidResourceException if resource does not exist
     */
    public Set<Acl> acls(Resource resource);

    /**
     * get the acls for this principal.
     *
     * @param principal
     * @return empty Map if no acls exist for this principal, otherwise a map of resource -> acls for the principal.
     *
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to access acls for the principal
     * @throws org.apache.kafka.common.errors.InvalidPrincipalException if principal is invalid
     */
    public Map<Resource, Set<Acl>> acls(KafkaPrincipal principal);

    /**
     * gets the map of resource to acls for all resources.
     */
    public Map<Resource, Set<Acl>> acls();

    /**
     * Closes this instance.
     */
    public void close();

}

...