Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Sources of an audit system are where the auditing information is initially generated. Kafka server could be a source since it can access every request and its response.

Terminology

Auditor: The generator of the initial auditing information. It is usually embedded in another application/service. For example, it could be a pluggable broker component that can inspect all requests and responses on a broker.

Proposed Changes

  1. Define an Observer interface and provide a no-operation observer implementation. The observer can be used to store and report auditing statistics and etc.

  2. Add a configuration property in the KafkaConfig class to allow users to specify the specific implementation of the Observer interface. The default value of this added field is the NoOpObserver which is provided as part of this KIP.

  3. Add simple logic to allow Kafka servers to report auditing statistics. More specifically, change KafkaApis code to invoke the auditor for request-response pairs. Implementation of the interface is provided by users.

Public Interfaces

Observer interface. Observers are instantiated by calling the no-arg constructor, then the configure() method is called, and then close() is called with no further record() calls expected.

  1. :

    Code Block
    languagescala
    titleObserver
    linenumberstrue
     /**

...

  1.  

...

  1. The Observer interface. This class allows user to implement their own auditing solution.

...

  1. 
      * Notice that the Observer may be used by multiple threads, so the implementation should be thread safe.

...

  1.  

...

  1. */
    trait Observer extends Configurable {

...

  1. 
      /*

...

  1. * Record the request and response pair based on the given information.

...

  1.  

...

  1. */
      def record(request: RequestChannel.Request, response: AbstractResponse): Unit

...

  1. 
      /

...

  1. *

...

  1. * Close the observer with timeout.

...

  1. 
        * @param timeout the maximum time to wait to close the observer.
        * @param unit    The time unit.

...

  1.  */
      def close(timeout: Long, unit: TimeUnit): Unit
    

...

Code Block
languagescala
titleConfiguration property
linenumberstrue

/** ********* Broker-side Observer Configuration ****************/
val ObserverClassProp = "observer.class.name"

...

  1. }

...

  1. Observers are instantiated by calling the no-arg constructor, then the configure() method is called, and then close() is called with no further record() calls expected.
    With this interface and the given type of RequestChannel.Request and AbstractResponse, the information we can extract from every produce or fetch request is:

    1. Topic name

    2. Client principal

    3. Client IP address

    4. Number of bytes produced or consumed

    5. Records count (if the batch version is 2 or greater, otherwise a deep decompression is required to get the count)

...

  1. Add a configuration property in the KafkaConfig class to allow users to specify implementations of the Observer interface:

    Code Block
    languagescala
    title

...

  1. Configuration property
    linenumberstrue

...

  1. /** ********* Broker-side Observer 

...

  1. Configuration ****************/
    val ObserverClassProp = "observer.class.names"


  2. Add code to the broker (in KafkaApis) to allow Kafka servers to invoke any observers defined. More specifically, change KafkaApis code to invoke all defined observers, in the order in which they were defined, for every request-response pair. 

Compatibility, Deprecation, and Migration Plan

N/A. The no-operation observer is used No observers are defined by default. Test of any other non-trivial implementation should be done by its implementers. Exceptions thrown by any of the observer APIs (s) should be caught and the functionalities of the broker should not be affected.

...