Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: In DiscussionApproved

Discussion threadhere

JIRAKAFKA-10525

...

There is a new auto-generated schema for each request type that supports outputting JSON payloads for request and response payloads. These can be adapted to provide structured request tracing.

Public Interfaces

The new auto-generated XYZDataJsonConverter exists schemas generate a converter class for each request/response type with the following signaturesuch as FetchRequestDataJsonConverter when ./gradlew processMessages is run. The signature looks as follows:

Code Block
titleMethod Signature of the auto-generated XYZDataJsonConverter
public class XYZDataJsonConverter {

	/* Converts the request/response data into a JsonNode*/
	public static JsonNode write(XYZData data, short version);

...

Code Block
languagescala
titleMethod Signature of RequestConvertToJson
object RequestConvertToJson {

	/* Calls**
	 * The data converter for request types which calls the appropriate
	 * XYZRequestDataJsonConverter.write()
	 * @return JsonNode
	 */
	def request(request: AbstractRequest, verbose: Boolean): JsonNode

	
	/* Calls**
	 * The data converter for response types which calls the appropriate
	 * XYZResponseDataJsonConverter.write()
	 * @return JsonNode
	 */
	def response(response: AbstractResponse, version: Short): JsonNode
}

...

Completed request: {"requestHeader": {"apiKey": "LIST_OFFSETS", "apiVersion": 5, "clientId": "consumer-group-1", "correlationId": 8799}, "request": {"replicaId":-1,"isolationLevel":0,"topics":[{"name":"test_topic","partitions":[{"partitionIndex":0,"timestamp":1599676632886,"currentLeaderEpoch":0}]}]},"response":{"throttleTimeMs":0,"topics":[{"name":"test_topic","partitions":[{"partitionIndex":0,"errorCode":0,"timestamp":1599676632886,"offset":8700,"leaderEpoch":0}]}]}, "connection": "127.0.0.1:62610-127.0.0.1:62622-3", "totalTime": 0.085, "requestQueueTime": 0.013, "localTime": 0.033, "remoteTime":0.011, "throttleTime":0,"responseQueueTime":0.011,"sendTime":0.015,"sendIoTime":0.01,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":"ClientInformation(softwareName=apache-kafka-java, softwareVersion=unknown)"}


The addition of a serializeRecords parameter was added to the auto-generated schemas so that ProduceRequest and FetchResponse logs can either output the record's bytes or the record's size in bytes.

Proposed trace fields

Current Trace FieldNew Key

RequestHeader

"requestHeader"

--

"request"

response

"response"

from connection

"connection"

totalTime

"totalTime"

requestQueueTime

"requestQueueTime"

localTime

"localTime"

remoteTime

"remoteTime"

throttleTime

"throttleTime"

responseQueueTime

"responseQueueTime"

sendTime

"sendTime"

sendIoTime

"sendIoTime"

securityProtocol

"securityProtocol"

principal

"principal"

listener

"listener"

clientInformation

"clientInformation"

...