THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
// This method is called on every RequestContext instance and RequestContext instance is constructed when SocketServer receives a request public RequestAndSize parseRequest(ByteBuffer buffer) { if (isUnsupportedApiVersionsRequest()) { // Unsupported ApiVersion requests are treated as v0 requests and are not parsed ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest((short) 0, header.apiVersion()); return new RequestAndSize(apiVersionsRequest, 0); } else { ApiKeys apiKey = header.apiKey(); try { short apiVersion = header.apiVersion(); Struct struct = apiKey.parseRequest(apiVersion, buffer); // Execute the observing logic only when there is at least on observer implementation is provided if (observers != null && observers.size() > 0) { // Convert Struct to ObservableStructImpl which is recorded by each observer ObservableStructImpl observableStruct = new ObservableStructImpl(struct); for (Observer observer : observers) { observer.recordRequest(observableStruct, clientAddress, principal, apiKey, apiVersion, connectionId); } } AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); return new RequestAndSize(body, struct.sizeOf()); } catch (Throwable ex) { throw new InvalidRequestException("Error getting request for apiKey: " + apiKey + ", apiVersion: " + header.apiVersion() + ", connectionId: " + connectionId + ", listenerName: " + listenerName + ", principal: " + principal, ex); } } } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
// This method is called on every response protected Send toSend(String destination, ResponseHeader header, short apiVersion, List<Observer> observers) { return new NetworkSend(destination, serialize(destination, apiVersion, header, observers)); } public ByteBuffer serialize(String connectionId, short version, ResponseHeader responseHeader, List<Observer> observers) { Struct responseStruct = toStruct(version); // Execute the observing logic only when there is at least on observer implementation is provided if (observers != null && observers.size() > 0) { ObservableStructImpl observableStruct = new ObservableStructImpl(responseStruct); for (Observer observer : observers) { observer.recordResponse(observableStruct, version, connectionId); } } return serialize(responseHeader.toStruct(), responseStruct); } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public void recordRequest(ObservableStructImpl struct,
InetAddress clientAddress,
KafkaPrincipal principal,
ApiKeys apiKey,
short apiVersion,
String correlationId) {
if (apiKey == ApiKeys.PRODUCE) {
// The information extracted from the ObservableStructImpl
Map<String, Integer> topicProduceBytesCount = new HashMap<>();
for (ObservableStructImpl topicDataStruct : struct.getObservableStructImplArray("topic_data")) {
String topic = topicDataStruct.get(TOPIC_NAME);
for (ObservableStructImpl partitionResponseStruct : topicDataStruct.getObservableStructImplArray("data")) {
MemoryRecords records = (MemoryRecords) partitionResponseStruct.getRecords("record_set");
if (topicProduceBytesCount.containsKey(topic)) {
topicProduceBytesCount.put(topic, topicProduceBytesCount.get(topic) + records.sizeInBytes());
} else {
topicProduceBytesCount.put(topic, topicProduceBytesCount.get(topic) + records.sizeInBytes());
}
}
}
} else if (apiKey == ApiKeys.FETCH) {
// ...
}
}
public void recordResponse(ObservableStructImpl struct, short apiVersion, String correlationId) {
// ...
// Use correlationId to figure out to what request type this response is
// ...
// The information extracted from the ObservableStructImpl
Map<String, Integer> topicFetchBytesCount = new HashMap<>();
for (ObservableStructImpl topicDataStruct : struct.getObservableStructImplArray("responses")) {
String topic = topicDataStruct.get(TOPIC_NAME);
for (ObservableStructImpl partitionResponseStruct : topicDataStruct.getObservableStructImplArray("partition_responses")) {
BaseRecords baseRecords = partitionResponseStruct.getRecords("record_set");
if (!(baseRecords instanceof MemoryRecords))
throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
MemoryRecords records = (MemoryRecords) baseRecords;
if (topicFetchBytesCount.containsKey(topic)) {
topicFetchBytesCount.put(topic, topicFetchBytesCount.get(topic) + records.sizeInBytes());
} else {
topicFetchBytesCount.put(topic, topicFetchBytesCount.get(topic) + records.sizeInBytes());
}
}
}
} |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public interface Observer <T extends AbstractRequest, R extends AbstractResponse> { // ... public void record(T request, R response); // ... } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
class ProduceObserver <ProduceRequest, ProduceResponse> { // ... public void record(ProduceRequest request, ProduceResponse response); // ... } |
...