...
Traffic analysis. It allows detailed analysis of traffic on particular topics, for particular principles, or for specific request types.
Correctness analysis and reporting. There could be various definitions on “correctness”. For example, “correctness” could be defined as whether the number of records produced to a Kafka cluster equals the number of records that is mirrored to another cluster.
SLO monitoring. The response time to a request could be broken down by topic, principle or any other arbitrary criteria.
Cost attribution. For example, the number of bytes produced or consumed could be broken down by topic, principle or any other arbitrary criteria.
Sources of an audit system are where the auditing information is initially generated. Kafka server could be a source since it can access every request and its response.
...
Public interfaces
Define an Observer interface:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** The Observer interface. This class allows user to implement their own auditing solution.
* Notice that the Observer may be used by multiple threads, so the implementation should be thread safe. */
trait Observer extends Configurable {
/** Record the request and response pair based on the given information. */
def record(request: RequestChannel.Request, response: AbstractResponse): Unit
/** Close the observer with timeout.
* @param timeout the maximum time to wait to close the observer.
* @param unit The time unit. */
def close(timeout: Long, unit: TimeUnit): Unit
} |
Observers are instantiated by calling the no-arg constructor, then the configure() method is called, and then close() is called with no further record() calls expected.
With this interface and the given type of RequestChannel.Request and AbstractResponse, the information we can extract from every produce or fetch request is:
Topic name
Client principal
Client IP address
Number of bytes produced or consumed
Records count (if the batch version is 2 or greater, otherwise a deep decompression is required to get the count)
Proposed Changes
Add
...
an observer interface that is described above and a configuration property in the KafkaConfig class to allow users to specify implementations of the
...
observer interface. The configuration property lets user define a list of observer implementations which are going to be invoked.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** ********* Broker-side Observer Configuration ****************/
val ObserverClassProp = "observer.class.names" |
Add code to the broker (in KafkaApis) to allow Kafka servers to invoke any observers defined. More specifically, change KafkaApis code to invoke all defined observers, in the order in which they were defined, for every request-response pair.
Compatibility, Deprecation, and Migration Plan
N/A. No observers are defined by default. Test of any other non-trivial implementation should be done by its implementers. Exceptions thrown by any of the observer(s) should be caught and the functionalities of the broker should not be affected.
Rejected Alternatives
Besides the broker-side auditing approach, the alternative approach is client-side auditing which means the auditor is embedded in the producer and consumer. From what we have learned in practice, the major issue of the client-side auditing approach which badly affects the usability of the auditing system is the lack of central administration on user’s selection of Kafka clients and versions of Kafka clients to use in their applications. Because there are many Kafka clients (even ones implemented in different programming languages such as Golang, C++, Python and etc) and versions. There are three direct consequences.
Lack of coverage due to the sheer number of users, slow deployment of diverse clients, and hard to enforce compliance.
In order to support diverse clients, various implementations of the auditor have to be provided which causes engineering overhead.
The slow rate of development on the audit system again due to the number of clients.
These existing Kafka server metrics provide lower-level insights into the system such as byte-in/out rates. However, an audit system complements the operation-level server metrics by providing additional application-level information such as byte-in/out rate associated with some specific application(s).
Since the major motivation is to support the auditing system, the interface could be named as “Auditor”. However, “Observer” is more general and flexible than it.
Another rejected alternative is to make an interceptor interface instead of an observer interface. Firstly, “inceptor” implies the possibility of modifying data which is not implied by an “observer”. Secondly, except the difference between interceptor and observer, the concept of the interceptor is more general than an observer in terms of what things could be done. However, the extra generalization complicates the KIP and is not necessary.