Table of Contents
Status
Current state: In DiscussionApproved
Discussion thread: TODO here
JIRA: KAFKA-10525
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
There is a new auto-generated schema for each request type that supports outputting JSON payloads for request and response payloads. These can be adapted to provide structured request tracing.
Public Interfaces
The new auto-generated schemas generate a converter class for each request/response type such as FetchRequestDataJsonConverter
when ./gradlew processMessages
is run. The signature looks as follows:
Code Block | ||
---|---|---|
| ||
public class XYZDataJsonConverter {
/* Converts the request/response data into a JsonNode*/
public static JsonNode write(XYZData data, short version); |
A new Scala singleton will be added to handle the data conversion to a JsonNode
Code Block | ||||
---|---|---|---|---|
| ||||
object RequestConvertToJson {
/**
* The data converter for request types which calls the appropriate
* XYZRequestDataJsonConverter.write()
* @return JsonNode
*/
def request(request: AbstractRequest, verbose: Boolean): JsonNode
/**
* The data converter for response types which calls the appropriate
* XYZResponseDataJsonConverter.write()
* @return JsonNode
*/
def response(response: AbstractResponse, version: Short): JsonNode
} |
With these helper functions, Requests in logs will become easily parsable.
...
Completed request: {"requestHeader": {"apiKey": "LIST_OFFSETS", "apiVersion": 5, "clientId": "consumer-group-1", "correlationId": 8799}, "request": {"replicaId":-1,"isolationLevel":0,"topics":[{"name":"test_topic","partitions":[{"partitionIndex":0,"timestamp":1599676632886,"currentLeaderEpoch":0}]}]},"response":{"throttleTimeMs":0,"topics":[{"name":"test_topic","partitions":[{"partitionIndex":0,"errorCode":0,"timestamp":1599676632886,"offset":8700,"leaderEpoch":0}]}]}, "connection": "127.0.0.1:62610-127.0.0.1:62622-3", "totalTime": 0.085, "requestQueueTime": 0.013, "localTime": 0.033, "remoteTime":0.011, "throttleTime":0,"responseQueueTime":0.011,"sendTime":0.015,"sendIoTime":0.01,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":"ClientInformation(softwareName=apache-kafka-java, softwareVersion=unknown)"}
The addition of a serializeRecords
parameter was added to the auto-generated schemas so that ProduceRequest and FetchResponse logs can either output the record's bytes or the record's size in bytes.
Proposed trace fields
Current Trace Field | New Key |
---|---|
RequestHeader | "requestHeader" |
-- | "request" |
response | "response" |
from connection | "connection" |
totalTime | "totalTime" |
requestQueueTime | "requestQueueTime" |
localTime | "localTime" |
remoteTime | "remoteTime" |
throttleTime | "throttleTime" |
responseQueueTime | "responseQueueTime" |
sendTime | "sendTime" |
sendIoTime | "sendIoTime" |
securityProtocol | "securityProtocol" |
principal | "principal" |
listener | "listener" |
clientInformation | "clientInformation" |
Proposed Changes
Make each request type’s data
accessible. Construct a helper class RequestConvertToJson
to handle converting the request data to a parsable JSON format.
In order to a log request, the appropriate helper function in RequestConvertToJson
is called on the request. From there, the respective XYZJsonDataConverter
XYZDataJsonConverter
is called and returns the JsonNode of the data which when converted to a string is in a parsable JSON format.
...