Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateUnder Discussion

Discussion thread: hereold thread 

JIRA: KAFKA-9413

Motivation

Auditing is a reporting functionality to notify other subsystems of the outcome of an authorization. It is used to check the activity of certain entities within a cluster. It is highly demanded in most businesses to have the ability of obtaining audit information in case someone changes cluster configuration (like creation/deletion/modify/description of any topic or ACLs) .
We may add this ability. Since audit requirements are so broad, it's impractical to support all of them.
Hence we have to provide ability for users to plug resources helping to achieve required capabilities.

Public Interfaces

or even record client events in some environment.

As a broader requirement reporting can happen to multiple services attached to Kafka each capturing different aspects and not just the outcome of the authorization but the outcome of the whole action. In a simple use-case one might log topic create events with the information whether they were authorized and successful or not. In a more complicated use-case one can report client and topic events into Apache Atlas which can create a dependency graph of these events to visualize the interconnectedness of clients and topics.

In this KIP we try to provide a generic solution that can be applied to a broader interpretation of auditing which can be applied to a variety of use-cases, such as the ones described above.

Public Interfaces

Auditor

...

Developers will be required to implement only the AuditExtension interface to provide an extension. We provide default implementation of AuditExtension that will send audit events to log file specified in logging properties.
We will provide several implementations of AuditEvent interface likewise there are a lot of audit events like create ACL, delete ACL, create topic, etc.
Moreover, it might be necessary to know which result the operation finishes with. Therefore, it is convenient to have separate event implementation for every audit event.
Every kind of audit event will be implemented by defaultan interface which gives the extension point to implement auditing and reporting. In terms of form it can make sense to provide a similar interface to the Authorizer as they are closely related, they are the flip side of each other. The snippet defining the interface can be found below. Similarly to the Authorizer we implement this in Java in the clients module so implementors won’t have to depend on the core module and ultimately on Scala.

To describe the interface broadly, it provides the request, its context, the authorized action and resources with the outcome of the authorization and errors if there were any. It also required to be an asynchronous implementation with low latency as it taps into performance-sensitive areas, such as handling produce requests. Resources can be created and destroyed with the start() and close() methods.

Code Block
languagejava
titleAuditor
linenumberstrue
package org.apache.kafka.server.auditor;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import
package org.apache.kafka.common.audit;
public interface AuditExtension extends Configurable, Closeable {
    /**
     * AuditExtension implementations received an audit event via the {@link
     * #onEvent(AuditEvent)} method. The extension implementation will invoke this method for every audit event occurrence.
     .resource.ResourcePattern;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;

import java.util.Collections;
import java.util.Map;

/**
* An auditor class that can be used to hook into the request after its completion and do auditing tasks, such as
* logging to a special file or sending information about the request to external systems.
* Threading model:
* <ul>
* <li>The auditor implementation must be thread-safe.</li>
* <li>The auditor implementation is expected to be asynchronous with low latency as it is used in performance
* sensitive areas, such as in handling produce requests.</li>
* <li>Any threads or thread pools used for processing remote operations asynchronously can be started during
* start(). These threads must be shutdown during close().</li>
* </ul>
*/
public interface Auditor extends Configurable, AutoCloseable {

    /**
     * @param event The audit event we want to describe{@link org.apache.kafka.common.audit.AuditEvent} Any threads or thread pools can be started in this method. These resources must be closed in the {@link #close()}
     * method.
     */
    void onEventstart(AuditEvent event);
}

As mentioned above, even though the developers are required to only implement the AuditExtension, they will be using new public interface implemented by default.

AuditEvent

Code Block
languagejava
package org.apache.kafka.common.audit;
import java.util.UUID;
public interface AuditEvent {

    /**
     * Called on request completion before returning the response to the client. It allows auditing multiple resources
    /** * in the request, such as multiple topics being created.
     * @param request Getis the unique event id request that has been issued by the client.
     * @param requestContext contains metadata to the request.
     * @return @param action is the operation that is being performed by the uuidclient.
     */
    UUID uuid();
	
    /** @param isAllowed is a mapping of resources to booleans to represent whether performing the action is allowed on
     * the resource Getin the request contextkey.
     * @param errors is a mapping of resources to errors to represent if performing the action on the given result was
     * @returnsuccessful {@link org.apache.kafka.common.requests.RequestContext}or not.
     */
    void audit(AbstractRequest request,
       RequestContext requestContext(        AuthorizableRequestContext requestContext,
               AclOperation action,
               Map<ResourcePattern, Boolean> isAllowed,
               Map<ResourcePattern, Errors> errors);
}

This The KIP also introduces a new configuration that audit.extension.classes that allows to configure called auditors which is a comma-separated list of Audit extension implementations. E.g.: Auditor implementations. By default it is configured with the LoggingAuditor default implementation.

Code Block
languagejava
titleProperty settings example
audit.extension.classesauditors=org.apache.kafka.commonserver.auditauditor.LoggingAuditExtensionLoggingAuditor,org.apache.kafka.common.audit.KafkaAuditExtensionwhatever.OtherAuditor

Proposed Changes

Users will be able to create a plugin by implementing the AuditExtension interface that has a single method that takes an AuditEvent's children instance as the only parameter.
This allows us to change the interface easily in the future to add new parameters.
Audit runtime will also provide a default implementation for AuditEvent interface.
One or more of AuditExtension's implementations can be configured via the configuration audit.extension.classes as a comma separated list of class names.

These implementations will get access to broker configuration via the configure(Map<String, ?> configs) method in the AuditExtension implementation (through org.apache.kafka.common.Configurable).

Reference Implementation

As part of the KIP we will define the interface above, implement the hooks in the various handle calls in KafkaApis similarly to the Authorizer, but doing the auditing before sending the response back as this is a common point where all the required parameters are ready. Besides this we will have a default logger implementation that logs some of the audited events such as topic creation, deletion, config changes and ACL changes into a file.

Any other specific implementations will live in the respective projects as we do it with the Authorizer. This shouldn’t be an extra burden on these specific implementations as they usually already implement the Authorizer or already have some client side Kafka dependenciesThis KIP proposes to include reference implementation that allows users to send audit events to separate logger.

Compatibility, Deprecation, and Migration Plan

This is entirely new functionality so there are no compatibility, deprecation, or migration concerns.

Testing

The correctness of the LoggingAuditor and data propagation between KafkaApis and the Auditor will be covered on the unit test level with mocking.

Rejected Alternatives

...

Client Side Auditing

Some auditing action can be quite heavy, such as auditing client actions, like detecting which client produces to which topics. It was considered to do some of these on the client side but it has multiple obstacles:

  • Auditing information still need to be collected in a central place, so it would require extra configuration on the client side.
  • Also repetition of the same events should be avoided which means we have to implement cache on the client side. This makes the clients more heavy which we would like to avoid. Also the same caching would apply to the brokers as well so implementation-wise we wouldn’t be ahead.