Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Changes requested from Mickael

...

Code Block
languagejava
titleAuditor
linenumberstrue
package org.apache.kafka.server.auditor;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;

import java.util.List;
import java.util.Map;

/**
 * An auditor class that can be used to hook into the request after its completion and do auditing tasks, such as
 * logging to a special file or sending information about the request to external systems.
 * Threading model:
 * <ul>
 *     <li>The auditor implementation must be thread-safe.</li>
 *     <li>The auditor implementation is expected to be asynchronous with low latency as it is used in performance
 *     sensitive areas, such as in handling produce requests.</li>
 *     <li>Any threads or thread pools used for processing remote operations asynchronously can be started during
 *     start(). These threads must be shutdown during close().</li>
 * </ul>
 */
public interface Auditor extends Configurable, AutoCloseable {

    class AuditInfo {
        private final ResourcePattern resource;
        private final BooleanResourcePattern allowedresource;
        private final Errors error;

        public static class Builder {
            private ResourcePattern resource;
            private Boolean allowed;
        private final   private Errors error;


        public AuditInfo(ResourceType resourceType,  public Builder resource(ResourcePattern resourceString resourceName) {
                this.resource = resource;
                return this;
            }

            public Builder allowed(Boolean allowed) {
  new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
              this.allowed = allowedtrue;
                return this;
            }

            public Builder error(Errors error) {
                this.error = error;
                return thisErrors.NONE;
            }

            public AuditInfo build() {
                return new AuditInfo(resource, allowed, error);
            }
        }

        public AuditInfo(ResourcePattern resourceResourceType resourceType, String resourceName, Boolean allowed, Errors error) {
            this.resource = resource new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
            this.allowed = allowed;
            this.error = errorErrors.NONE;
        }

        public AuditInfo(ResourceType resourceType, String resourceName,
                         Boolean allowed, Errors error) {
            this.resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
            this.allowed = allowed;
            this.error = error;
        }

        public AuditInfo(ResourceType resourceType, String resourceName, PatternType patternType,
                         Boolean allowed, Errors error) {
            this.resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERALpatternType);
            this.allowed = allowed;
            this.error = Errors.NONEerror;
        }

        public ResourcePattern resource() {
            return resource;
        }

        public Boolean allowed() {
            return allowed;
        }

        public Errors error() {
            return error;
        }
    }

    /**
     * Any threads or thread pools can be started in this method. These resources must be closed in the {@link #close()}
     * method.
     */
    void start();

    /**
     * Called on request completion before returning the response to the client. It allows auditing multiple resources
     * in the request, such as multiple topics being created.
     * @param request is the request that has been issued by the client.
     * @param requestContext contains metadata to the request.
     * @param auditInfo is the operation, resource and the outcome of the authorization with the possible error
     *                  coupled together.
     */
    void audit(AbstractRequest request,
               AuthorizableRequestContext requestContext,
               Map<AclOperation, List<AuditInfo>> auditInfo);
}

The KIP also introduces a new configuration called auditors which is a comma-separated list of Auditor implementations. By default it is configured with the LoggingAuditor default implementation.

...

As part of the KIP we will define the interface above, implement the hooks in the various handle calls in KafkaApis similarly to the Authorizer, but doing the auditing before sending the response back as this is a common point where all the required parameters are ready. Besides this we will have a default logger implementation that logs some of the audited events such as topic creation, deletion, config changes and ACL changes into a file. Any other specific implementations implementation will live in the respective projects as we do it with the Authorizer. This shouldn’t be an extra burden on these specific implementations as they usually already implement the Authorizer or already have some client side Kafka dependencies.

Default Implementation

We will have a default logger implementation that logs the following audited events under a logger named auditLogger:

  • Topic events: describe, list, create, delete, partition number change, replication factor change
  • Config events: describe, alter config (incremental as well as legacy)
  • ACL events: describe, create, delete
  • replica log dirs: describe, alter
  • Reassignment: alter, list
  • Groups: describe, delete
  • Scram credentials: describe, alter
  • Client quotas: describe, alter
  • Delete records
  • Delete offsets

Compatibility, Deprecation, and Migration Plan

...