You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Status

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-7596

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

A Kafka audit system is very useful for both users and developers of the streaming platform since it reveals system insights. For example, one question that could be answered with audit information is that which application consumes or produces how much data to which topic(s) over what period of time. Such Insights derived from audit information opens up opportunities. For example,

  1. Traffic analysis. It allows detailed analysis of traffic on particular topics, for particular principles, or for specific request types.

  2. Correctness analysis and reporting. There could be various definitions on “correctness”. For example, “correctness” could be defined as whether the number of records produced to a Kafka cluster equals the number of records that is mirrored to another cluster.

  3. SLO monitoring. The response time to a request could be broken down by topic, principle or any other arbitrary criteria.

  4. Cost attribution. For example, the number of bytes produced or consumed could be broken down by topic, principle or any other arbitrary criteria.

Sources of an audit system are where the auditing information is initially generated. Kafka server could be a source since it can access every request and its response.

Public interfaces

Define a RequestInfo interface:

RequestInfo Interface
public interface RequestInfo {
  
  /* @return the request header that contains information such as API key, API version, client ID and etc.
   */
  public RequestHeader getHeader();
  
  /* @return IP address of the client that made this request
   */
  public InetAddress getClientAddress();
  
  /* @return principal of the client that made this request
   */
  public KafkaPrincipal getPrincipal();
  
  /* @return size in bytes of the body of this request
   */
  public int getBodySize();
  
  /* This method should only be invoked if the request is of type ApiKeys.PRODUCE. Otherwise, an exception is thrown.
   * @return a map of topic name to the number of bytes that are supposed to be produced to this topic in this request
   */
  public Map<String, Long> getProduceToTopicSizeInBytes();

  /* This method should only be invoked if the request is of type ApiKeys.PRODUCE. Otherwise, an exception is thrown.
   * @return a map of topic name to the number of records that are supposed to be produced to this topic in this request
   */
  public Map<String, Long> getProduceToTopicRecordCount();
}

This interface defines the methods that can be used in the client's implementation of the observer to extract information from the request.


Define a ResponseInfo interface:

ResponseInfo Interface
public interface ResponseInfo {
  /* @return a map of error to its count of occurrence.
   */
  public Map<Errors, Integer> getErrorCounts();

  /* This method should only be invoked if the response is of type ApiKeys.FETCH. Otherwise, an exception is thrown.
   * @return a map of topic partition to the number of bytes that are fetched from this topic partition.
   */
  public Map<TopicPartition, Long> getFetchFromTopicPartitionSizeInBytes();
}

This interface defines the methods that can be used in the client's implementation of the observer to extract information from the response.


Define RequestAdapter class:

RequestInfo implementation
public class RequestAdapter implements RequestInfo {


	public RequestAdapter(RequestChannel.Request request) {
		// ...
	}
	// ...
}

An implementation of the RequestInfo interface which has a constructor that takes in a request instance. The instantiated instance of this class is used by the observer.


Define ResponseAdapter class:

ResponseInfo implementation
public class ResponseAdapter implements ResponseInfo {


	public ResponseAdapter(AbstractResponse response) {
		// ...
	}
	// ...
}

An implementation of the ResponseInfo interface which has a constructor that takes in a response instance. The instantiated instance of this class is used by the observer.


Define an Observer interface:

Observer
 /** The Observer interface. This class allows user to implement their own auditing solution.
  * Notice that the Observer may be used by multiple threads, so the implementation should be thread safe. */
public interface Observer extends Configurable {
  /* Record the request and response pair based on the given information.
   */
  public void record(RequestAdapter request, ResponseAdapter response);

  /* Close the observer with timeout.
   *
   * @param timeout the maximum time to wait to close the observer.
   * @param unit    The time unit.
   */
  public void close(long timeout, TimeUnit unit);
}

Observers are instantiated by calling the no-arg constructor, then the configure() method is called, and then close() is called with no further record() calls expected.
With this interface and the given type of RequestChannel.Request and AbstractResponse, the information we can extract from every produce or fetch request is:

  1. Topic name

  2. Client principal

  3. Client IP address

  4. Number of bytes produced or consumed

  5. Records count (if the batch version is 2 or greater, otherwise a deep decompression is required to get the count)

Proposed Changes

Add interfaces and implementations for classes that are described above and a configuration property in the KafkaConfig class to allow users to specify implementations of the observer interface. The configuration property lets user define a list of observer implementations which are going to be invoked.

Configuration property
/** ********* Broker-side Observer Configuration ****************/
val ObserverClassProp = "observer.class.names"

Add code to the broker (in KafkaApis) to allow Kafka servers to invoke any observers defined. More specifically, change KafkaApis code to invoke all defined observers, in the order in which they were defined, for every request-response pair. For example, in the sendResponse method in the KafkaApis class:

Observers usage
private def sendResponse(request: RequestChannel.Request,
                         responseOpt: Option[AbstractResponse],
                         onComplete: Option[Send => Unit]): Unit = {
  // Update error metrics for each error code in the response including Errors.NONE
  responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))

  val response = responseOpt match {
    case Some(response) =>
      val responseSend = request.context.buildResponse(response)
	  
      // Convert RequestChannel.Request and AbstractResponse into their adapter instances which implements the request/response info interface
      val requestAdapter = new RequestAdapter(request)
      val responseAdapter = new ResponseAdapter(response)
	  // Invoke the record method on each observer on each request and response info instance. "observers" is a list of observers and each of
      // them represents one user-provided implementation of the observer interface.
      for (observer <- observers) {
        try {
          (observer: Observer).record(requestAdapter, responseAdapter)
        } catch {
          case e: Throwable => error(s"Observer failed to record $request and $response", e)
        }
      }

      val responseString =
        if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
        else None
      new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
    case None =>
      new RequestChannel.NoOpResponse(request)
  }
  sendResponse(response)
}

Compatibility, Deprecation, and Migration Plan

N/A. No observers are defined by default. Test of any other non-trivial implementation should be done by its implementers. Exceptions thrown by any of the observer(s) should be caught and the functionalities of the broker should not be affected.

Rejected Alternatives

Besides the broker-side auditing approach, the alternative approach is client-side auditing which means the auditor is embedded in the producer and consumer. From what we have learned in practice, the major issue of the client-side auditing approach which badly affects the usability of the auditing system is the lack of central administration on user’s selection of Kafka clients and versions of Kafka clients to use in their applications. Because there are many Kafka clients (even ones implemented in different programming languages such as Golang, C++, Python and etc) and versions. There are three direct consequences.

  1. Lack of coverage due to the sheer number of users, slow deployment of diverse clients, and hard to enforce compliance.

  2. In order to support diverse clients, various implementations of the auditor have to be provided which causes engineering overhead.

  3. The slow rate of development on the audit system again due to the number of clients.

These existing Kafka server metrics provide lower-level insights into the system such as byte-in/out rates. However, an audit system complements the operation-level server metrics by providing additional application-level information such as byte-in/out rate associated with some specific application(s).

Since the major motivation is to support the auditing system, the interface could be named as “Auditor”. However, “Observer” is more general and flexible than it.

Another rejected alternative is to make an interceptor interface instead of an observer interface. Firstly, “inceptor” implies the possibility of modifying data which is not implied by an “observer”. Secondly, except the difference between interceptor and observer, the concept of the interceptor is more general than an observer in terms of what things could be done. However, the extra generalization complicates the KIP and is not necessary.

  • No labels