Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: In DiscussionApproved

Discussion threadhere

JIRAKAFKA-10525

...

There is a new auto-generated schema for each request type that supports outputting JSON payloads for request and response payloads. These can be adapted to provide structured request tracing.

Public Interfaces

The new auto-generated schemas generate a converter class for each request/response type such as FetchRequestDataJsonConverter when ./gradlew processMessages is run. The signature looks as follows:

Code Block
titleMethod Signature of the auto-generated XYZDataJsonConverter
public class XYZDataJsonConverter {

	/* Converts the request/response data into a JsonNode*/
	public static JsonNode write(XYZData data, short version);


A new Scala singleton will be added to handle the data conversion to a JsonNode

Code Block
languagescala
titleMethod Signature of RequestConvertToJson
object RequestConvertToJson {

	/**
	 * The data converter for request types which calls the appropriate
	 * XYZRequestDataJsonConverter.write()
	 * @return JsonNode
	 */
	def request(request: AbstractRequest, verbose: Boolean): JsonNode

	
	/**
	 * The data converter for response types which calls the appropriate
	 * XYZResponseDataJsonConverter.write()
	 * @return JsonNode
	 */
	def response(response: AbstractResponse, version: Short): JsonNode
}


With these helper functions, Requests in logs will become easily parsable.

...

Completed request: {"requestHeader": {"apiKey": "LIST_OFFSETS", "apiVersion": 5, "clientId": "consumer-group-1", "correlationId": 8799}, "request": {"replicaId":-1,"isolationLevel":0,"topics":[{"name":"test_topic","partitions":[{"partitionIndex":0,"timestamp":1599676632886,"currentLeaderEpoch":0}]}]},"response":{"throttleTimeMs":0,"topics":[{"name":"test_topic","partitions":[{"partitionIndex":0,"errorCode":0,"timestamp":1599676632886,"offset":8700,"leaderEpoch":0}]}]}, "connection": "127.0.0.1:62610-127.0.0.1:62622-3", "totalTime": 0.085, "requestQueueTime": 0.013, "localTime": 0.033, "remoteTime":0.011, "throttleTime":0,"responseQueueTime":0.011,"sendTime":0.015,"sendIoTime":0.01,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":"ClientInformation(softwareName=apache-kafka-java, softwareVersion=unknown)"}


The addition of a serializeRecords parameter was added to the auto-generated schemas so that ProduceRequest and FetchResponse logs can either output the record's bytes or the record's size in bytes.

Proposed trace fields

Current Trace FieldNew Key

RequestHeader

"requestHeader"

--

"request"

response

"response"

from connection

"connection"

totalTime

"totalTime"

requestQueueTime

"requestQueueTime"

localTime

"localTime"

remoteTime

"remoteTime"

throttleTime

"throttleTime"

responseQueueTime

"responseQueueTime"

sendTime

"sendTime"

sendIoTime

"sendIoTime"

securityProtocol

"securityProtocol"

principal

"principal"

listener

"listener"

clientInformation

"clientInformation"

...

In order to a log request, the appropriate helper function in RequestConvertToJson is called on the request. From there, the respective XYZJsonDataConverter XYZDataJsonConverter is called and returns the JsonNode of the data which when converted to a string is in a parsable JSON format.

...