Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added event interface and examples

...

In this KIP we try to provide a generic solution that can be applied to a broader interpretation of auditing which can be applied to a variety of use-cases, such as the ones described above.

Public Interfaces


Auditor

Developers will be required to implement an interface which gives the extension point to implement auditing and reporting. In terms of form it can make sense to provide a similar interface to the Authorizer as they are closely related, they are the flip side of each other. The snippet defining the interface can be found below. Similarly to the Authorizer we implement this in Java in the clients module so implementors won’t have to depend on the core module and ultimately on Scala.

To describe the interface broadly, it provides the request, its context, the authorized action and resources with the outcome of the authorization and errors if there were any. It also required to be an asynchronous implementation with low latency as it taps into performance-sensitive areas, such as handling produce requests. Resources can be created and destroyed with the start() and close() methods. Moreover exactly one audit call will happen when calling a certain API as authorizations will be collected throughout the API and passed to the auditor when all information is available, therefore giving the widest possible context to the implementer.

Code Block
languagejava
titleAuditor
linenumberstrue
package org.apache.kafka.server.auditor;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.resource.PatternTypeannotation.InterfaceStability;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.commonserver.resourceauthorizer.ResourceTypeAuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizableRequestContextAuthorizationResult;

import java.util.List;
import java.util.Map;

/**
 * An auditor class that can be used to hook into the request after its completion and do auditing tasks, such as
 * logging to a special file or sending information about the request to external systems.
 * Threading model:
 * <ul>
 *     <li>The auditor implementation must be thread-safe.</li>
 *     <li>The auditor implementation is expected to be asynchronous with low latency as it is used in performance
 *     sensitive areas, such as in handling produce requests.</li>
 *     <li>Any threads or thread pools used for processing remote operations asynchronously can be started during
 *     start(). These threads must be shutdown during close().</li>
 * </ul>
 */
@InterfaceStability.Evolving
public interface Auditor extends Configurable, AutoCloseable {

    class AuditInfo {

        private final ResourcePattern resourcePattern;
        private final ResourcePatternAclOperation resourceoperation;
        private final BooleanAuthorizationResult allowed;
        private final Errorsint error;


        public AuditInfo(ResourceTypeAclOperation resourceTypeoperation, StringResourcePattern resourceNameresourcePattern) {
            this.resourceoperation = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL) operation;
            this.resourcePattern = resourcePattern;
            this.allowed = trueAuthorizationResult.ALLOWED;
            this.error = Errors.NONE0;
        }

        public AuditInfo(ResourceTypeAclOperation resourceTypeoperation, StringResourcePattern resourceNameresourcePattern, BooleanAuthorizationResult allowed, int error) {
            this.resourceoperation = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL) operation;
            this.resourcePattern = resourcePattern;
            this.allowed = allowed;
            this.error = Errors.NONEerror;
        }

        public AuditInfo(ResourceType resourceType, String resourceName,
AclOperation operation() {
            return operation;
        }

       Boolean allowed,public ErrorsResourcePattern errorresource() {
            return   this.resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
resourcePattern;
        }

        public AuthorizationResult allowed() {
            return this.allowed = allowed;
 allowed;
        }

        public int errorCode() {
           this.error =return error;
        }
    }

    /**
     * Any publicthreads AuditInfo(ResourceType resourceType, String resourceName, PatternType patternType,
            or thread pools can be started in this method. These resources must be closed in the {@link #close()}
     * method.
     */
  Boolean allowed, Errorsvoid error) {start();

    /**
     * Called on request completion before returning the response this.resource = new ResourcePattern(resourceType, resourceName, patternType);
            this.allowed = allowed;to the client. It allows auditing multiple resources
     * in the request, such as multiple topics being created.
     * @param event is the request specific data passed down to the auditor.
     * @param requestContext contains metadata to the request.
     * @param auditInfo is the operation, resource and the outcome of the  this.error =authorization with the possible error;
     *   }

             public ResourcePattern resource() {coupled together.
     */
    void audit(AuditEvent event,  return resource;
        }

        public Boolean allowed() {
    AuthorizableRequestContext requestContext, List<AuditInfo> auditInfo);
}

The KIP also introduces a new configuration called auditors which is a comma-separated list of Auditor implementations. By default it is configured with the LoggingAuditor default implementation.

Code Block
languagejava
titleProperty settings example
auditors=org.apache.kafka.server.auditor.LoggingAuditor,org.whatever.OtherAuditor

AuditEvent

This is a marker interface to serve as a base for all specific event class implementations.

Code Block
languagejava
titleAuditEvent
package org.apache.kafka.server.auditor;

import org.apache.kafka.common.annotation.InterfaceStability;

@InterfaceStability.Evolving
public interface AuditEvent {
}

Specific Event Classes

There will be specific classes defined for each event much like the *Result classes for the AdminClient. Some examples are:

Code Block
languagejava
titleCreateTopicEvent
package org.apache.kafka.server.auditor.events;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.server.auditor.AuditEvent;

import java.util.List;

@InterfaceStability.Evolving
public class TopicCreateEvent implements AuditEvent {

    public static class CreatedTopic {
        String topicName;
        returnint allowednumPartitions;
        }int replicationFactor;

        public Errors error( CreatedTopic(String topicName, int numPartitions, int replicationFactor) {
            this.topicName = topicName;
          return error  this.numPartitions = numPartitions;
        }
     this.replicationFactor = replicationFactor;
        }

    /**
    public String  * Called on request completion before returning the response to the client. It allows auditing multiple resources
     * in the request, such as multiple topics being created.name() {
            return topicName;
        }

        public int numPartitions() {
     *  @param requestContext contains metadata to thereturn request.numPartitions;
     * @param auditInfo is}

 the operation, resource and the outcome of thepublic authorizationint with the possible error
replicationFactor() {
       *     return replicationFactor;
        }
    coupled together.
}

    private List<CreatedTopic> */topics;

    voidpublic audit(AuthorizableRequestContext requestContext,TopicCreateEvent(List<CreatedTopic> topics) {
        this.topics = topics;
     Map<AclOperation, List<AuditInfo>> auditInfo);

...

}

    public List<CreatedTopic> topics() {
        return topics;
    }
}


Code Block
languagejava
titleProperty settings exampleCreateAclEvent
package auditors=org.apache.kafka.server.auditor.LoggingAuditor,org.whatever.OtherAuditorevents;

import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.server.auditor.AuditEvent;

import java.util.List;

@InterfaceStability.Evolving
public class CreateAclEvent implements AuditEvent {

    private final List<AclBinding> bindings;

    public CreateAclEvent(List<AclBinding> bindings) {
        this.bindings = bindings;
    }

    public List<AclBinding> bindings() {
        return bindings;
    }
}

Proposed Changes

As part of the KIP we will define the interface above, implement the hooks in the various handle calls in KafkaApis similarly to the Authorizer, but doing the auditing before sending the response back as this is a common point where all the required parameters are ready. Any specific implementation will live in the respective projects as we do it with the Authorizer. This shouldn’t be an extra burden on these specific implementations as they usually already implement the Authorizer or already have some client side Kafka dependencies.

...