Table of Contents
Status
Current state: In DiscussionApproved
Discussion thread: here
JIRA: KAFKA-10525
...
There is a new auto-generated schema for each request type that supports outputting JSON payloads for request and response payloads. These can be adapted to provide structured request tracing.
Public Interfaces
The new auto-generated XYZDataJsonConverter exists schemas generate a converter class for each request/response type with the following signaturesuch as FetchRequestDataJsonConverter
when ./gradlew processMessages
is run. The signature looks as follows:
Code Block | ||
---|---|---|
| ||
public class XYZDataJsonConverter { /* Converts the request/response data into a JsonNode*/ public static JsonNode write(XYZData data, short version); |
...
Code Block | ||||
---|---|---|---|---|
| ||||
object RequestConvertToJson { /* Calls** * The data converter for request types which calls the appropriate * XYZRequestDataJsonConverter.write() * @return JsonNode */ def request(request: AbstractRequest, verbose: Boolean): JsonNode /* Calls** * The data converter for response types which calls the appropriate * XYZResponseDataJsonConverter.write() * @return JsonNode */ def response(response: AbstractResponse, version: Short): JsonNode } |
...
Completed request: {"requestHeader": {"apiKey": "LIST_OFFSETS", "apiVersion": 5, "clientId": "consumer-group-1", "correlationId": 8799}, "request": {"replicaId":-1,"isolationLevel":0,"topics":[{"name":"test_topic","partitions":[{"partitionIndex":0,"timestamp":1599676632886,"currentLeaderEpoch":0}]}]},"response":{"throttleTimeMs":0,"topics":[{"name":"test_topic","partitions":[{"partitionIndex":0,"errorCode":0,"timestamp":1599676632886,"offset":8700,"leaderEpoch":0}]}]}, "connection": "127.0.0.1:62610-127.0.0.1:62622-3", "totalTime": 0.085, "requestQueueTime": 0.013, "localTime": 0.033, "remoteTime":0.011, "throttleTime":0,"responseQueueTime":0.011,"sendTime":0.015,"sendIoTime":0.01,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":"ClientInformation(softwareName=apache-kafka-java, softwareVersion=unknown)"}
The addition of a serializeRecords
parameter was added to the auto-generated schemas so that ProduceRequest and FetchResponse logs can either output the record's bytes or the record's size in bytes.
Proposed trace fields
Current Trace Field | New Key |
---|---|
RequestHeader | "requestHeader" |
-- | "request" |
response | "response" |
from connection | "connection" |
totalTime | "totalTime" |
requestQueueTime | "requestQueueTime" |
localTime | "localTime" |
remoteTime | "remoteTime" |
throttleTime | "throttleTime" |
responseQueueTime | "responseQueueTime" |
sendTime | "sendTime" |
sendIoTime | "sendIoTime" |
securityProtocol | "securityProtocol" |
principal | "principal" |
listener | "listener" |
clientInformation | "clientInformation" |
...