Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
titleConfiguration propertyHow request is recorded
linenumberstrue
// This method is called on every RequestContext instance and RequestContext instance is constructed when SocketServer receives a request
public RequestAndSize parseRequest(ByteBuffer buffer) {
    if (isUnsupportedApiVersionsRequest()) {
        // Unsupported ApiVersion requests are treated as v0 requests and are not parsed
        ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest((short) 0, header.apiVersion());
        return new RequestAndSize(apiVersionsRequest, 0);
    } else {
        ApiKeys apiKey = header.apiKey();
        try {
            short apiVersion = header.apiVersion();
            Struct struct = apiKey.parseRequest(apiVersion, buffer);
            // Execute the observing logic only when there is at least on observer implementation is provided
            if (observers != null && observers.size() > 0) {
                // Convert Struct to ObservableStructImpl which is recorded by each observer
                ObservableStructImpl observableStruct = new ObservableStructImpl(struct);
                for (Observer observer : observers) {
                    observer.recordRequest(observableStruct, clientAddress, principal, apiKey, apiVersion,
                        connectionId);
                }
            }
            
            AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct);
            return new RequestAndSize(body, struct.sizeOf());
        } catch (Throwable ex) {
            throw new InvalidRequestException("Error getting request for apiKey: " + apiKey +
                    ", apiVersion: " + header.apiVersion() +
                    ", connectionId: " + connectionId +
                    ", listenerName: " + listenerName +
                    ", principal: " + principal, ex);
        }
    }
}

...

Code Block
languagescala
titleObservers usageHow response is recorded
linenumberstrue
// This method is called on every response
protected Send toSend(String destination, ResponseHeader header, short apiVersion, List<Observer> observers) {
    return new NetworkSend(destination, serialize(destination, apiVersion, header, observers));
}

public ByteBuffer serialize(String connectionId, short version, ResponseHeader responseHeader, List<Observer> observers) {
    Struct responseStruct = toStruct(version);

    // Execute the observing logic only when there is at least on observer implementation is provided
    if (observers != null && observers.size() > 0) {
        ObservableStructImpl observableStruct = new ObservableStructImpl(responseStruct);
        for (Observer observer : observers) {
            observer.recordResponse(observableStruct, version, connectionId);
        }
    }
    return serialize(responseHeader.toStruct(), responseStruct);
}

...

Code Block
languagescala
titleObservers usageObserver Partial Implementation Example
linenumberstrue

public void recordRequest(ObservableStructImpl struct,
                          InetAddress clientAddress,
                          KafkaPrincipal principal,
                          ApiKeys apiKey,
                          short apiVersion,
                          String correlationId) {

  if (apiKey == ApiKeys.PRODUCE) {
    // The information extracted from the ObservableStructImpl
    Map<String, Integer> topicProduceBytesCount = new HashMap<>();

    for (ObservableStructImpl topicDataStruct : struct.getObservableStructImplArray("topic_data")) {

      String topic = topicDataStruct.get(TOPIC_NAME);

      for (ObservableStructImpl partitionResponseStruct : topicDataStruct.getObservableStructImplArray("data")) {

        MemoryRecords records = (MemoryRecords) partitionResponseStruct.getRecords("record_set");

        if (topicProduceBytesCount.containsKey(topic)) {
          topicProduceBytesCount.put(topic, topicProduceBytesCount.get(topic) + records.sizeInBytes());
        } else {
          topicProduceBytesCount.put(topic, topicProduceBytesCount.get(topic) + records.sizeInBytes());
        }
      }
    }
  } else if (apiKey == ApiKeys.FETCH) {
	// ...
  }
}

public void recordResponse(ObservableStructImpl struct, short apiVersion, String correlationId) {
  
  // ...
  // Use correlationId to figure out to what request type this response is
  // ...
  // The information extracted from the ObservableStructImpl
  Map<String, Integer> topicFetchBytesCount = new HashMap<>();

  for (ObservableStructImpl topicDataStruct : struct.getObservableStructImplArray("responses")) {
    String topic = topicDataStruct.get(TOPIC_NAME);

    for (ObservableStructImpl partitionResponseStruct : topicDataStruct.getObservableStructImplArray("partition_responses")) {
      BaseRecords baseRecords = partitionResponseStruct.getRecords("record_set");
      if (!(baseRecords instanceof MemoryRecords))
        throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
      MemoryRecords records = (MemoryRecords) baseRecords;

      if (topicFetchBytesCount.containsKey(topic)) {
        topicFetchBytesCount.put(topic, topicFetchBytesCount.get(topic) + records.sizeInBytes());
      } else {
        topicFetchBytesCount.put(topic, topicFetchBytesCount.get(topic) + records.sizeInBytes());
      }
    }
  }
}

...

Code Block
languagescala
titleObservers usageType-specific Observer
linenumberstrue
public interface Observer <T extends AbstractRequest, R extends AbstractResponse> {
	// ...
	public void record(T request, R response);
	// ...
}

...

Code Block
languagescala
titleObservers usageType-specific Observer Implementation
linenumberstrue
class ProduceObserver <ProduceRequest, ProduceResponse> {
	// ...
	public void record(ProduceRequest request, ProduceResponse response);
	// ...
}

...