Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To describe the interface broadly, it provides the request, its context, the authorized action and resources with the outcome of the authorization and errors if there were any. It also required to be an asynchronous implementation with low latency as it taps into performance-sensitive areas, such as handling produce requests. Resources can be created and destroyed with the startconfigure() and close() methods. Moreover exactly one audit call will happen when calling a certain API as authorizations will be collected throughout the API and passed to the auditor when all information is available, therefore giving the widest possible context to the implementer.

Code Block
languagejava
package org.apache.kafka.server.auditor;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;

import java.util.List;

/**
 * An auditor class that can be used to hook into the request after its completion and do auditing tasks, such as
 * logging to a special file or sending information about the request to external systems.
 * Threading model:
 * <ul>
 *     <li>The auditor implementation must be thread-safe.</li>
 *     <li>The auditor implementation is expected to be asynchronous with low latency as it is used in performance
 *     sensitive areas, such as in handling produce requests.</li>
 *     <li>Any threads or thread pools used for processing remote operations asynchronously can be started during
 *     start(). These threads must be shutdown during close().</li>
 * </ul>
 */
@InterfaceStability.Evolving
public interface Auditor extends Configurable, AutoCloseable {

    class AuditInfoAuditAuthInfo {

        private final ResourcePattern resourcePattern;
        private final AclOperation operation;
        private final AuthorizationResult allowed;
        private final int error;

        public AuditInfoAuditAuthInfo(AclOperation operation, ResourcePattern resourcePattern) {
            this.operation = operation;
            this.resourcePattern = resourcePattern;
            this.allowed = AuthorizationResult.ALLOWED;
            this.error = 0;
        }

        public AuditInfoAuditAuthInfo(AclOperation operation, ResourcePattern resourcePattern, AuthorizationResult allowed, int error) {
            this.operation = operation;
            this.resourcePattern = resourcePattern;
            this.allowed = allowed;
            this.error = error;
        }

        public AclOperation operation() {
            return operation;
        }

        public ResourcePattern resource() {
            return resourcePattern;
        }

        public AuthorizationResult allowed() {
            return allowed;
        }

        public int errorCode() {
            return error;
        }
    }

    /**
     * Any threads or thread pools can be started in this method. These resources must be closed in the {@link #close()}
     * method.
     */
    void start();

    /**
     * Called on request completion before returning the response to the client. It allows auditing multiple resources
     * in the request, such as multiple topics being created.
     * @param event is the request specific data passed down to the auditor.
 It may be null *if @paramthere requestContextare contains metadata to the request.no specific
     * @param auditInfo is the operation, resource and the outcome of the authorization with theinformation possibleis error
available for the given audited *event type.
     * @param requestContext contains metadata to the request.
     coupled* together.
@param auditAuthInfo is the operation, */
resource and the outcome voidof audit(AuditEvent event, AuthorizableRequestContext requestContext, List<AuditInfo> auditInfo);
}

the authorization with the possible error
     *                  coupled together.
     */
    void audit(AuditEvent event, AuthorizableRequestContext requestContext, List<AuditAuthInfo> auditAuthInfo);
}

The KIP also introduces a new configuration called auditors which is The KIP also introduces a new configuration called auditors which is a comma-separated list of Auditor implementations. By default it is configured with the LoggingAuditor default implementation.

...

Code Block
languagejava
titleCreateTopicEventTopicEvent
package org.apache.kafka.server.auditor.events;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.server.auditor.AuditEvent;

import java.util.List;

@InterfaceStability.Evolving
public class TopicCreateEventTopicEvent implements AuditEvent {

    public staticenum class CreatedTopicEventType {
         String topicNameCREATE, DELETE, PARTITION_CHANGE, REPLICATION_FACTOR_CHANGE;
     }

    public static class TopicInAudit {
        String topicName;
        int numPartitions;
        int replicationFactor;

        public CreatedTopicTopicInAudit(String topicName, int numPartitions, int replicationFactor) {
            this.topicName = topicName;
            this.numPartitions = numPartitions;
            this.replicationFactor = replicationFactor;
        }

        public String name() {
            return topicName;
        }

        public int numPartitions() {
            return numPartitions;
        }

        public int replicationFactor() {
            return replicationFactor;
        }
    }

    private List<CreatedTopic>List<TopicInAudit> topics;

    publicprivate TopicCreateEvent(List<CreatedTopic> topics) {EventType eventType;

    public TopicEvent(List<TopicInAudit> topics, EventType eventType) {
        this.topics = topics;
        this.eventType = eventType;
    }

    public List<CreatedTopic>List<TopicInAudit> topics() {
        return topics;
    }
}

    public EventType eventType() {
        return eventType;
    }
}


Code Block
languagejava
titleAclEvent
package org.apache
Code Block
languagejava
titleCreateAclEvent
package org.apache.kafka.server.auditor.events;

import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.server.auditor.AuditEvent;

import java.util.List;

@InterfaceStability.Evolving
public class CreateAclEventAclEvent implements AuditEvent {

    privatepublic finalenum List<AclBinding>EventType bindings;
{
        CREATE, DELETE, DESCRIBE;
    }

    private final List<AclBinding> bindings;

    public CreateAclEventAclEvent(List<AclBinding> bindings) {
        this.bindings = bindings;
    }

    public List<AclBinding> bindings() {
        return bindings;
    }
}

Generic Events

Not all request types will need to be accompanied with the corresponding AuditEvent class as there are 50+ Kafka APIs where many are control requests which may or may not be relevant from the user's perspective and it would be very labour intensive to code and maintain these. To overcome this the auditor may pass null as the AuditEvent parameter in the audit method.

Proposed Changes

As part of the KIP we will define the interface above, implement the hooks in the various handle calls in KafkaApis similarly to the Authorizer, but doing the auditing before sending the response back as this is a common point where all the required parameters are ready. Any specific implementation will live in the respective projects as we do it with the Authorizer. This shouldn’t be an extra burden on these specific implementations as they usually already implement the Authorizer or already have some client side Kafka dependencies.

...