Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Sources of an audit system are where the auditing information is initially generated. Kafka server could be a source since it can access every request and its response.

Public interfaces

Define a RequestInfo interface:

Code Block
public interface RequestInfo {
  
  /* @return the request header that contains information such as API key, API version, client ID and etc.
   */
  public RequestHeader getHeader();
  
  /* @return IP address of the client that made this request
   */
  public InetAddress getClientAddress();
  
  /* @return principal of the client that made this request
   */
  public KafkaPrincipal getPrincipal();
  
  /* @return size in bytes of the body of this request
   */
  public int getBodySize();
  
  /* This method should only be invoked if the request is of type ApiKeys.PRODUCE. Otherwise, an exception is thrown.
   * @return a map of topic name to the number of bytes that are supposed to be produced to this topic in this request
   */
  public Map<String, Long> getProduceToTopicSizeInBytes();

  /* This method should only be invoked if the request is of type ApiKeys.PRODUCE. Otherwise, an exception is thrown.
   * @return a map of topic name to the number of records that are supposed to be produced to this topic in this request
   */
  public Map<String, Long> getProduceToTopicRecordCount();
}

This interface defines the methods that can be used in the client's implementation of the observer to extract information from the request.


Define a ResponseInfo interface:

Code Block
public interface ResponseInfo {
  /* @return a map of error to its count of occurrence.
   */
  public Map<Errors, Integer> getErrorCounts();

  /* This method should only be invoked if the request is of type ApiKeys.FETCH. Otherwise, an exception is thrown.
   * @return a map of topic partition to the data that is fetched from this topic partition.
   */
  public Map<TopicPartition, FetchResponse.PartitionData> getFetchResponseData();
}

This interface defines the methods that can be used in the client's implementation of the observer to extract information from the response.


Define RequestAdapter class:

Code Block
public class RequestAdapter implements RequestInfo {


	public RequestAdapter(RequestChannel.Request request) {
		// ...
	}
	// ...
}

An implementation of the RequestInfo interface which has a constructor that takes in a request instance. The instantiated instance of this class is used by the observer.


Define ResponseAdapter class:

Code Block
public class ResponseAdapter implements ResponseInfo {


	public ResponseAdapter(AbstractResponse response) {
		// ...
	}
	// ...
}

An implementation of the ResponseInfo interface which has a constructor that takes in a response instance. The instantiated instance of this class is used by the observer.


Define an Observer interface:

Code Block
languagescala
titleObserver
linenumberstrue
 /** The Observer interface. This class allows user to implement their own auditing solution.
  * Notice that the Observer may be used by multiple threads, so the implementation should be thread safe. */
traitpublic interface Observer extends Configurable {
  /** Record the request and response pair based on the given information.
   */
  defpublic void record(RequestAdapter request: RequestChannel.Request,, ResponseAdapter response: AbstractResponse): Unit);

  /** Close the observer with timeout.
   *
   * @param timeout the maximum time to wait to close the observer.
    * @param unit    The time unit.
   */
  defpublic void close(long timeout:, Long,TimeUnit unit: TimeUnit): Unit);
}

Observers are instantiated by calling the no-arg constructor, then the configure() method is called, and then close() is called with no further record() calls expected.
With this interface and the given type of RequestChannel.Request and AbstractResponse, the information we can extract from every produce or fetch request is:

  1. Topic name

  2. Client principal

  3. Client IP address

  4. Number of bytes produced or consumed

  5. Records count (if the batch version is 2 or greater, otherwise a deep decompression is required to get the count)

Proposed Changes

Add

...

interfaces and implementations for classes that are described above and a configuration property in the KafkaConfig class to allow users to specify implementations of the observer interface. The configuration property lets user define a list of observer implementations which are going to be invoked.

Code Block
languagescala
titleConfiguration property
linenumberstrue
/** ********* Broker-side Observer Configuration ****************/
val ObserverClassProp = "observer.class.names"

Add code to the broker (in KafkaApis) to allow Kafka servers to invoke any observers defined. More specifically, change KafkaApis code to invoke all defined observers, in the order in which they were defined, for every request-response pair. For example, in the sendResponse method in the KafkaApis class:

Code Block
languagescala
titleConfiguration property
linenumberstrue
private def sendResponse(request: RequestChannel.Request,
                         responseOpt: Option[AbstractResponse],
                         onComplete: Option[Send => Unit]): Unit = {
  // Update error metrics for each error code in the response including Errors.NONE
  responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))

  val response = responseOpt match {
    case Some(response) =>
      val responseSend = request.context.buildResponse(response)
	  
      // Convert RequestChannel.Request and AbstractResponse into their adapter instances which implements the request/response info interface
      val requestAdapter = new RequestAdapter(request)
      val responseAdapter = new ResponseAdapter(response)
	  // Invoke the record method on each observer on each request and response info instance. "observers" is a list of observers and each of
      // them represents one user-provided implementation of the observer interface.
      for (observer <- observers) {
        try {
          (observer: Observer).record(requestAdapter, responseAdapter)
        } catch {
          case e: Throwable => error(s"Observer failed to record $request and $response", e)
        }
      }

      val responseString =
        if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
        else None
      new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
    case None =>
      new RequestChannel.NoOpResponse(request)
  }
  sendResponse(response)
}

Compatibility, Deprecation, and Migration Plan

...