Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add one specific use case in the motivation section.

...

  1. Traffic analysis. It allows detailed analysis of traffic on particular topics, for particular principles, or for specific request types. Useful information could be derived from the audit information. For example, user can easily figure out what topics have not been accessed for some certain period of time. With this information, users can initiate some topic deletion process and etc.

  2. Correctness analysis and reporting. There could be various definitions on “correctness”. For example, “correctness” could be defined as whether the number of records produced to a Kafka cluster equals the number of records that is mirrored to another cluster.

  3. SLO monitoring. The response time to a request could be broken down by topic, principle or any other arbitrary criteria.

  4. Cost attribution. For example, the number of bytes produced or consumed could be broken down by topic, principle or any other arbitrary criteria.

...

Code Block
languagejava
titleObservableStruct Interface
linenumberstrue
collapsetrue
public interface ObservableStruct {

  public Byte get(Field.Int8 field);

  public Integer get(Field.Int32 field);

  public Long get(Field.Int64 field);

  public Short get(Field.Int16 field);

  public String get(Field.Str field);

  public String get(Field.NullableStr field);

  public Long getOrElse(Field.Int64 field, long alternative);

  public Short getOrElse(Field.Int16 field, short alternative);

  public Byte getOrElse(Field.Int8 field, byte alternative);

  public Integer getOrElse(Field.Int32 field, int alternative);

  public String getOrElse(Field.NullableStr field, String alternative);

  public String getOrElse(Field.Str field, String alternative);

  public boolean hasField(String name);

  public boolean hasField(Field def);

  public Byte getByte(BoundField field);

  public byte getByte(String name);

  public BaseRecords getRecords(String name);

  public Short getShort(BoundField field);

  public Short getShort(String name);

  public Integer getInt(BoundField field);

  public Integer getInt(String name);

  public Long getUnsignedInt(String name);

  public Long getLong(BoundField field);

  public Long getLong(String name);

  public String getString(BoundField field);

  public String getString(String name);

  public Boolean getBoolean(BoundField field);

  public Boolean getBoolean(String name);

  public ByteBuffer getBytes(BoundField field);

  public ByteBuffer getBytes(String name);

  public int sizeOf();
}

...

Code Block
languagescala
titleConfiguration propertyHow request is recorded
linenumberstrue
// This method is called on every RequestContext instance and RequestContext instance is constructed when SocketServer receives a request
public RequestAndSize parseRequest(ByteBuffer buffer) {
    if (isUnsupportedApiVersionsRequest()) {
        // Unsupported ApiVersion requests are treated as v0 requests and are not parsed
        ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest((short) 0, header.apiVersion());
        return new RequestAndSize(apiVersionsRequest, 0);
    } else {
        ApiKeys apiKey = header.apiKey();
        try {
            short apiVersion = header.apiVersion();
            Struct struct = apiKey.parseRequest(apiVersion, buffer);
            // Execute the observing logic only when there is at least on observer implementation is provided
            if (observers != null && observers.size() > 0) {
                // Convert Struct to ObservableStructImpl which is recorded by each observer
                ObservableStructImpl observableStruct = new ObservableStructImpl(struct);
                for (Observer observer : observers) {
                    observer.recordRequest(observableStruct, clientAddress, principal, apiKey, apiVersion,
                        connectionId);
                }
            }
            
            AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct);
            return new RequestAndSize(body, struct.sizeOf());
        } catch (Throwable ex) {
            throw new InvalidRequestException("Error getting request for apiKey: " + apiKey +
                    ", apiVersion: " + header.apiVersion() +
                    ", connectionId: " + connectionId +
                    ", listenerName: " + listenerName +
                    ", principal: " + principal, ex);
        }
    }
}

...

Code Block
languagescala
titleObservers usageHow response is recorded
linenumberstrue
// This method is called on every response
protected Send toSend(String destination, ResponseHeader header, short apiVersion, List<Observer> observers) {
    return new NetworkSend(destination, serialize(destination, apiVersion, header, observers));
}

public ByteBuffer serialize(String connectionId, short version, ResponseHeader responseHeader, List<Observer> observers) {
    Struct responseStruct = toStruct(version);

    // Execute the observing logic only when there is at least on observer implementation is provided
    if (observers != null && observers.size() > 0) {
        ObservableStructImpl observableStruct = new ObservableStructImpl(responseStruct);
        for (Observer observer : observers) {
            observer.recordResponse(observableStruct, version, connectionId);
        }
    }
    return serialize(responseHeader.toStruct(), responseStruct);
}

...

Code Block
languagescala
titleObservers usageObserver Partial Implementation Example
linenumberstrue

public void recordRequest(ObservableStructImpl struct,
                          InetAddress clientAddress,
                          KafkaPrincipal principal,
                          ApiKeys apiKey,
                          short apiVersion,
                          String correlationId) {

  if (apiKey == ApiKeys.PRODUCE) {
    // The information extracted from the ObservableStructImpl
    Map<String, Integer> topicProduceBytesCount = new HashMap<>();

    for (ObservableStructImpl topicDataStruct : struct.getObservableStructImplArray("topic_data")) {

      String topic = topicDataStruct.get(TOPIC_NAME);

      for (ObservableStructImpl partitionResponseStruct : topicDataStruct.getObservableStructImplArray("data")) {

        MemoryRecords records = (MemoryRecords) partitionResponseStruct.getRecords("record_set");

        if (topicProduceBytesCount.containsKey(topic)) {
          topicProduceBytesCount.put(topic, topicProduceBytesCount.get(topic) + records.sizeInBytes());
        } else {
          topicProduceBytesCount.put(topic, topicProduceBytesCount.get(topic) + records.sizeInBytes());
        }
      }
    }
  } else if (apiKey == ApiKeys.FETCH) {
	// ...
  }
}

public void recordResponse(ObservableStructImpl struct, short apiVersion, String correlationId) {
  
  // ...
  // Use correlationId to figure out to what request type this response is
  // ...
  // The information extracted from the ObservableStructImpl
  Map<String, Integer> topicFetchBytesCount = new HashMap<>();

  for (ObservableStructImpl topicDataStruct : struct.getObservableStructImplArray("responses")) {
    String topic = topicDataStruct.get(TOPIC_NAME);

    for (ObservableStructImpl partitionResponseStruct : topicDataStruct.getObservableStructImplArray("partition_responses")) {
      BaseRecords baseRecords = partitionResponseStruct.getRecords("record_set");
      if (!(baseRecords instanceof MemoryRecords))
        throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
      MemoryRecords records = (MemoryRecords) baseRecords;

      if (topicFetchBytesCount.containsKey(topic)) {
        topicFetchBytesCount.put(topic, topicFetchBytesCount.get(topic) + records.sizeInBytes());
      } else {
        topicFetchBytesCount.put(topic, topicFetchBytesCount.get(topic) + records.sizeInBytes());
      }
    }
  }
}

...

Code Block
languagescala
titleObservers usageType-specific Observer
linenumberstrue
public interface Observer <T extends AbstractRequest, R extends AbstractResponse> {
	// ...
	public void record(T request, R response);
	// ...
}

...

Code Block
languagescala
titleObservers usageType-specific Observer Implementation
linenumberstrue
class ProduceObserver <ProduceRequest, ProduceResponse> {
	// ...
	public void record(ProduceRequest request, ProduceResponse response);
	// ...
}

...