You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-7596

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

A Kafka audit system is very useful for both users and developers of the streaming platform since it reveals system insights. For example, one question that could be answered with audit information is that which application consumes or produces how much data to which topic(s) over what period of time. Such Insights derived from audit information opens up opportunities. For example,

  1. Traffic analysis. It allows detailed analysis of traffic on particular topics, for particular principles, or for specific request types.

  2. Correctness analysis and reporting. There could be various definitions on “correctness”. For example, “correctness” could be defined as whether the number of records produced to a Kafka cluster equals the number of records that is mirrored to another cluster.

  3. SLO monitoring. The response time to a request could be broken down by topic, principle or any other arbitrary criteria.

  4. Cost attribution. For example, the number of bytes produced or consumed could be broken down by topic, principle or any other arbitrary criteria.

Sources of an audit system are where the auditing information is initially generated. Kafka server could be a source since it can access every request and its response.

Terminology

Auditor: The generator of the initial auditing information. It is usually embedded in another application/service. For example, it could be a pluggable broker component that can inspect all requests and responses on a broker.

Proposed Changes

  1. Define an Observer interface and provide a no-operation observer implementation. The observer can be used to store and report auditing statistics and etc.

  2. Add a configuration property in the KafkaConfig class to allow users to specify the specific implementation of the Observer interface. The default value of this added field is the NoOpObserver which is provided as part of this KIP.

  3. Add simple logic to allow Kafka servers to report auditing statistics. More specifically, change KafkaApis code to invoke the auditor for request-response pairs. Implementation of the interface is provided by users.

Public Interfaces

Observer interface. Observers are instantiated by calling the no-arg constructor, then the configure() method is called, and then close() is called with no further record() calls expected.

Observer
/**
  * The Observer interface. This class allows user to implement their own auditing solution.
  *
  * Notice that the Observer may be used by multiple threads, so the implementation should be thread safe.
  */
trait Observer extends Configurable {


  /**
    * Record the request and response pair based on the given information.
    */
  def record(request: RequestChannel.Request, response: AbstractResponse): Unit

  /**
    * Close the observer with timeout.
    *
    * @param timeout the maximum time to wait to close the observer.
    * @param unit    The time unit.
    */
  def close(timeout: Long, unit: TimeUnit): Unit

}


A configuration property is added to the KafkaConfig class. It specifies the observer class that implements the interface and defaults to the No-operation observer class which is provided.

Configuration property
/** ********* Broker-side Observer Configuration ****************/
val ObserverClassProp = "observer.class.name"

For example, with this interface and the given type of RequestChannel.Request and AbstractResponse, the information we can extract from every produce or fetch request is:

  1. Topic name

  2. Client principal

  3. Client IP address

  4. Number of bytes produced or consumed

  5. Records count (if the batch version is 2 or greater, otherwise a deep decompression is required to get the count)

The NoOpObserver implementation is given as below:

No-operation observer
class NoOpObserver extends Observer {

  override def configure(javaConfigs: util.Map[String, _]) {
  }

  /**
    * Record the request and response pair based on the given information.
    *
    * @param request  The request to be recorded
    * @param response The response to be recorded
    */
  override def record(request: RequestChannel.Request, response: AbstractResponse): Unit = {
  }

  /**
    * Close the observer with timeout.
    * This method will be called when producer is closed with a timeout.
    *
    * @param timeout the maximum time to wait to close the observer.
    * @param unit    The time unit.
    */
  override def close(timeout: Long, unit: TimeUnit): Unit = {
  }
}


Compatibility, Deprecation, and Migration Plan

N/A. The no-operation observer is used by default. Test of any other non-trivial implementation should be done by its implementers. Exceptions thrown by any of the observer APIs should be caught and the functionalities of the broker should not be affected.

Rejected Alternatives

  1. Besides the broker-side auditing approach, the alternative approach is client-side auditing which means the auditor is embedded in the producer and consumer. From what we have learned in practice, the major issue of the client-side auditing approach which badly affects the usability of the auditing system is the lack of central administration on user’s selection of Kafka clients and versions of Kafka clients to use in their applications. Because there are many Kafka clients (even ones implemented in different programming languages such as Golang, C++, Python and etc) and versions. There are three direct consequences.

    1. Lack of coverage due to the sheer number of users, slow deployment of diverse clients, and hard to enforce compliance.

    2. In order to support diverse clients, various implementations of the auditor have to be provided which causes engineering overhead.

    3. The slow rate of development on the audit system again due to the number of clients.

  2. These existing Kafka server metrics provide lower-level insights into the system such as byte-in/out rates. However, an audit system complements the operation-level server metrics by providing additional application-level information such as byte-in/out rate associated with some specific application(s).

  3. Since the major motivation is to support the auditing system, the interface could be named as “Auditor”. However, “Observer” is more general and flexible than it.

  4. Another rejected alternative is to make an interceptor interface instead of an observer interface. Firstly, “inceptor” implies the possibility of modifying data which is not implied by an “observer”. Secondly, except the difference between interceptor and observer, the concept of the interceptor is more general than an observer in terms of what things could be done. However, the extra generalization complicates the KIP and is not necessary.

  • No labels