Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update embedded interfaces

...

To describe the interface broadly, it provides the request, its context, the authorized action and resources with the outcome of the authorization and errors if there were any. It also required to be an asynchronous implementation with low latency as it taps into performance-sensitive areas, such as handling produce requests. Resources can be created and destroyed with the configure() and close() methods. Moreover exactly one audit call will happen when calling a certain API as authorizations will be collected throughout the API and passed to the auditor when all information is available, therefore giving the widest possible context to the implementer.

Code Block
languagejava
package org.apache.kafka.server.auditor;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;

import java.util.List;

/**
 * An auditor class that can be used to hook into the request after its completion and do auditing tasks, such as
 * logging to a special file or sending information about the request to external systems.
 * Threading model:
 * <ul>
 *     <li>The auditor implementation must be thread-safe.</li>
 *     <li>The auditor implementation is expected to be asynchronous with low latency as it is used in performance
 *     sensitive areas, such as in handling produce requests.</li>
 *     <li>Any threads or thread pools used for processing remote operations asynchronously can be started during
 *     start(). These threads must be shutdown during close().</li>
 * </ul>
 */
@InterfaceStability.Evolving
public interface Auditor extends Configurable, AutoCloseable {

    class AuditAuthInfo {

        private final ResourcePattern resourcePattern;
        private final AclOperation operation;
        private final AuthorizationResult allowed;
        private final int error;

        public AuditAuthInfo(AclOperation operation, ResourcePattern resourcePattern) {
            this.operation = operation;
            this.resourcePattern = resourcePattern;
            this.allowed = AuthorizationResult.ALLOWED;
            this.error = 0;
        }

        public AuditAuthInfo(AclOperation operation, ResourcePattern resourcePattern, AuthorizationResult allowed, int error) {
            this.operation = operation;
            this.resourcePattern = resourcePattern;
            this.allowed = allowed;
            this.error = error;
        }

        public AclOperation operation() {
            return operation;
        }

        public ResourcePattern resource() {
            return resourcePattern;
        }

        public AuthorizationResult allowed() {
            return allowed;
        }

        public int errorCode() {
            return error;
        }
    }

    /**
     * Any threads or thread pools can be started in this method. These resources must be closed in the {@link #close()}
     * method.
     */
    void start();

    /**
     * Called on request completion before returning the response to the client. It allows auditing multiple resources
     * in the request, such as multiple topics being created.
     * @param event is the request specific data passed down to the auditor. It may be null if there are no specific
     *              information is available for the given audited event type.
     * @param requestContext contains metadata to the request.
     * @param auditAuthInfo is the operation, resource and the outcome of the authorization with the possible error
     *                  coupled together.
     */
    void audit(AuditEvent event, AuthorizableRequestContext requestContext, List<AuditAuthInfo> auditAuthInfo);
}

The KIP also introduces a new configuration called auditors which is a comma-separated list of Auditor implementations. By default it is configured with the LoggingAuditor default implementation.

Code Block
languagejava
titleProperty settings example
auditors=org.apache.kafka.server.auditor.LoggingAuditor,org.whatever.OtherAuditor

AuditEvent

This is a marker interface to serve as a base for all specific event class implementations.

Code Block
languagejava
titleAuditEvent
package org.apache.kafka.server.auditor;

import org.apache.kafka.common.annotation.InterfaceStability;

@InterfaceStability.Evolving
public interface AuditEvent {
}

Specific Event Classes

There will be specific classes defined for each event much like the *Result classes for the AdminClient. Some examples are:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.server.auditor;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;

/**
 * An auditor class that can be used to hook into the request after its completion and do auditing tasks, such as
 * logging to a special file or sending information about the request to external systems.
 * Threading model:
 * <ul>
 *     <li>The auditor implementation must be thread-safe.</li>
 *     <li>The auditor implementation is expected to be asynchronous with low latency as it is used in performance
 *     sensitive areas, such as in handling produce requests.</li>
 *     <li>Any threads or thread pools used for processing remote operations asynchronously can be started during
 *     start(). These threads must be shutdown during close().</li>
 * </ul>
 */
@InterfaceStability.Evolving
public interface Auditor extends Configurable, AutoCloseable {

    /**
     * Called on request completion before returning the response to the client. It allows auditing multiple resources
     * in the request, such as multiple topics being created.
     * @param event is the request specific data passed down to the auditor. It may be null if there are no specific
     *              information is available for the given audited event type.
     * @param requestContext contains metadata to the request.
     */
    void audit(AuditEvent event, AuthorizableRequestContext requestContext);
}


Code Block
languagejava
titleAuditInfo
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.server.auditor;

import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.server.authorizer.AuthorizationResult;

/**
 * This class encapsulates the authorization information with the result of the operation. It is used in specific ways
 * in the {@link AuditEvent} implementations. For instance a {@link org.apache.kafka.server.auditor.events.TopicEvent}
 * will have an AuditInfo for every topic as each topic is authorized but in case of an
 * {@link org.apache.kafka.server.auditor.events.AclEvent} authorization only happens for the cluster resource,
 * therefore there will be only one instance of this.
 */
@InterfaceStability.Evolving
public class AuditInfo {

    private final ResourcePattern resourcePattern;
    private final AclOperation operation;
    private final AuthorizationResult allowed;
    private final int error;

    public AuditInfo(AclOperation operation, ResourcePattern resourcePattern) {
        this.operation = operation;
        this.resourcePattern = resourcePattern;
        this.allowed = AuthorizationResult.ALLOWED;
        this.error = 0;
    }

    public AuditInfo(AclOperation operation, ResourcePattern resourcePattern, AuthorizationResult allowed, int error) {
        this.operation = operation;
        this.resourcePattern = resourcePattern;
        this.allowed = allowed;
        this.error = error;
    }

    public AclOperation operation() {
        return operation;
    }

    public ResourcePattern resource() {
        return resourcePattern;
    }

    public AuthorizationResult allowed() {
        return allowed;
    }

    public int errorCode() {
        return error;
    }
}


The KIP also introduces a new configuration called auditors which is a comma-separated list of Auditor implementations. By default it is configured with the LoggingAuditor default implementation.

Code Block
languagejava
titleProperty settings example
auditors=org.apache.kafka.server.auditor.LoggingAuditor,org.whatever.OtherAuditor

AuditEvent

This is a marker interface to serve as a base for all specific event class implementations.

Code Block
languagejava
titleAuditEvent
package org.apache.kafka.server.auditor;

import org.apache.kafka.common.annotation.InterfaceStability;

@InterfaceStability.Evolving
public interface AuditEvent {
}

Specific Event Classes

There will be specific classes defined for each event much like the *Result classes for the AdminClient. Some examples are:

Code Block
languagejava
titleTopicEvent
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.server.auditor.events;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.server.auditor.AuditEvent;
import org.apache.kafka.server.auditor.AuditInfo;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;

@InterfaceStability.Evolving
public class TopicEvent extends AuditEvent {

    public static class AuditedTopic {
        private final String topicName;
        private final int numPartitions;
        private final int replicationFactor;

        private static final int NO_PARTITION_NUMBER = -1;
        private static final int NO_REPLICATION_FACTOR = -1;

        public AuditedTopic(String topicName) {
            this.topicName = topicName;
            this.numPartitions = NO_PARTITION_NUMBER;
            this.replicationFactor = NO_REPLICATION_FACTOR;
        }

        public AuditedTopic(String topicName, int numPartitions, int replicationFactor) {
            this.topicName = topicName;
            this.numPartitions = numPartitions;
            this.replicationFactor = replicationFactor;
        }

        public String name() {
            return topicName;
        }

        public int numPartitions() {
            return numPartitions;
        }

        public int replicationFactor() {
            return replicationFactor;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            AuditedTopic that = (AuditedTopic) o;
            return numPartitions == that.numPartitions &&
                replicationFactor == that.replicationFactor &&
                topicName.equals(that.topicName);
        }

        @Override
        public int hashCode() {
            return Objects.hash(topicName, numPartitions, replicationFactor);
        }
    }

    public enum EventType {
        CREATE, DELETE
    }

    private final Map<AuditedTopic, AuditInfo> auditInfo;
    private final EventType eventType;

    public static TopicEvent topicCreateEvent(Map<AuditedTopic, AuditInfo> auditInfo) {
        return new TopicEvent(auditInfo, EventType.CREATE);
    }

    public static TopicEvent topicDeleteEvent(Map<AuditedTopic, AuditInfo> auditInfo) {
        return new TopicEvent(auditInfo, EventType.DELETE);
    }

    public TopicEvent(Map<AuditedTopic, AuditInfo> auditInfo, EventType eventType) {
        this.auditInfo = Collections.unmodifiableMap(auditInfo);
        this.eventType = eventType;
    }

    public Map<AuditedTopic, AuditInfo> auditInfo() {
        return auditInfo;
    }

    public EventType eventType() {
        return eventType;
    }
}


Code Block
languagejava
titleAclEvent
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.server.auditor.events;

import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.server.auditor.AuditEvent;
import org.apache.kafka.server.auditor.AuditInfo;
import org.apache.kafka.server.authorizer.AclDeleteResult;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

@InterfaceStability.Evolving
public class AclEvent<T, R> extends AuditEvent {

    public enum EventType {
        CREATE, DELETE, DESCRIBE;
    }

    private final AuditInfo clusterAuditInfo;
    private final Set<T> auditedEntities;
    private final EventType eventType;
    private final Map<R, Integer> operationResults;

    public static AclEvent<AclBinding, AclBinding> aclCreateEvent(Set<AclBinding> auditedEntities,
                                                                  AuditInfo clusterAuditInfo) {
        return new AclEvent<>(auditedEntities, clusterAuditInfo, Collections.emptyMap(), EventType.CREATE);
    }

    public static AclEvent<AclBinding, AclBinding> aclCreateEvent(Set<AclBinding> auditedEntities,
                                                                  AuditInfo clusterAuditInfo,
                                                                  Map<AclBinding, Integer> results) {
        return new AclEvent<>(auditedEntities, clusterAuditInfo, results, EventType.CREATE);
    }

    public static AclEvent<AclBindingFilter, AclDeleteResult> aclDeleteEvent(Set<AclBindingFilter> auditedEntities,
                                                                             AuditInfo clusterAuditInfo) {
        return new AclEvent<>(auditedEntities, clusterAuditInfo, Collections.emptyMap(), EventType.DELETE);
    }

    public static AclEvent<AclBindingFilter, AclDeleteResult> aclDeleteEvent(Set<AclBindingFilter> auditedEntities,
                                                                             AuditInfo clusterAuditInfo,
                                                    
Code Block
languagejava
titleTopicEvent
package org.apache.kafka.server.auditor.events;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.server.auditor.AuditEvent;

import java.util.List;

@InterfaceStability.Evolving
public class TopicEvent implements AuditEvent {

    public enum EventType {
        CREATE, DELETE, PARTITION_CHANGE, REPLICATION_FACTOR_CHANGE;
    }

    public static class TopicInAudit {
        String topicName;
        int numPartitions;
        int replicationFactor;

        public TopicInAudit(String topicName, int numPartitions, int replicationFactor) {
            this.topicName = topicName;
           Map<AclDeleteResult, this.numPartitions = numPartitions;Integer> results) {
        return new AclEvent<>(auditedEntities, clusterAuditInfo, results, this.replicationFactor = replicationFactor;
EventType.DELETE);
    }

    public static AclEvent<AclBindingFilter, AclBinding>  }
aclDescribeEvent(Set<AclBindingFilter> auditedEntities,
        public String name() {
            return topicName;
        }

        public int numPartitions() {
            return numPartitions;
        }

        public int replicationFactor() {
 AuditInfo clusterAuditInfo,
           return replicationFactor;
        }
    }

    private List<TopicInAudit> topics;
    private EventType eventType;

    public TopicEvent(List<TopicInAudit>  topics, EventType eventType) {
        this.topics = topics;
        this.eventType = eventType;
    }

    publicMap<AclBinding, List<TopicInAudit>Integer> topics(results) {
        return topicsnew AclEvent<>(auditedEntities, clusterAuditInfo, results, EventType.DESCRIBE);
    }

    public AclEvent(Set<T> auditedEntities, AuditInfo clusterAuditInfo, Map<R, Integer> operationResults, public EventType eventType() {
        this.auditedEntities return eventType= Collections.unmodifiableSet(auditedEntities);
    }
}
Code Block
languagejava
titleAclEvent
package org.apache.kafka.server.auditor.events;

import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.server.auditor.AuditEvent;

import java.util.List;

@InterfaceStability.Evolving
public class AclEvent implements AuditEvent {  this.clusterAuditInfo = clusterAuditInfo;
        this.eventType = eventType;
        this.operationResults = Collections.unmodifiableMap(operationResults);
    }

    public enumSet<T> EventTypeauditedEntities() {
        CREATE, DELETE, DESCRIBEreturn auditedEntities;
    }

    private final List<AclBinding> bindings;

public AuditInfo clusterAuditInfo() {
        return clusterAuditInfo;
    }

    public Map<R, Integer> AclEventoperationResults(List<AclBinding> bindings) {
        this.bindings = bindingsreturn operationResults;
    }

    public List<AclBinding>EventType bindingseventType() {
        return bindingseventType;
    }
}

Generic Events

Not all request types will need to be accompanied with the corresponding AuditEvent class as there are 50+ Kafka APIs where many are control requests which may or may not be relevant from the user's perspective and it would be very labour intensive to code and maintain these. To overcome this the auditor may pass null as the AuditEvent parameter in the audit method.

...