...
Our goal is to minimize the time taken to discover a new leader, and use that information as soon as possible on the client. This can be beneficial in a number of situations such as cluster rolls, reassignments, or preferred leader election.
Note, performance tests are being done to show the improvements with the proposed changes. Results to follow.
Proposed Changes
This KIP proposes to make new leader information available in the ProduceResponse and FetchResponse when a newer leader is known by the receiving broker. This should help us eliminate the metadata refresh from the critical path of the Produce and Fetch requests, which can be very latency-sensitive. This is an optimization mentioned in KIP-595, which uses a similar technique but only for the metadata log and the Fetch RPC.
...
Code Block |
---|
{ "apiKey": 0, "type": "response", "name": "ProduceResponse", "validVersions": "0-9", "flexibleVersions": "9+", "fields": [ { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+", "about": "Each produce response", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "The topic name" }, { "name": "PartitionResponses", "type": "[]PartitionProduceResponse", "versions": "0+", "about": "Each partition that we produced to within the topic.", "fields": [ { "name": "Index", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "BaseOffset", "type": "int64", "versions": "0+", "about": "The base offset." }, { "name": "LogAppendTimeMs", "type": "int64", "versions": "2+", "default": "-1", "ignorable": true, "about": "The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1. If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended." }, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The log start offset." }, { "name": "RecordErrors", "type": "[]BatchIndexAndErrorMessage", "versions": "8+", "ignorable": true, "about": "The batch indices of records that caused the batch to be dropped", "fields": [ { "name": "BatchIndex", "type": "int32", "versions": "8+", "about": "The batch index of the record that cause the batch to be dropped" }, { "name": "BatchIndexErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "about": "The error message of the record that caused the batch to be dropped"} ]}, { "name": "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable": true, "about": "The global error message summarizing the common root cause of the records that caused the batch to be dropped"}, // ---------- Start new field ---------- { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "9+", "taggedVersions": "9+", "tag": 0, "fields": [ { "name": "LeaderId", "type": "int32", "versions": "9+", "default": "-1", "entityType": "brokerId", "about": "The ID of the current leader or -1 if the leader is unknown."}, { "name": "LeaderEpoch", "type": "int32", "versions": "9+", "default": "-1", "about": "The latest known leader epoch"} ]} // ---------- End new field ---------- ]} ]}, { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "default": "0", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, // ---------- Start new field ---------- { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "9+", "taggedVersions": "12+", "tag": 0, "about": "Endpoints for all current-leaders enumerated in PartitionData.", "fields": [ { "name": "NodeId", "type": "int32", "versions": "9+", "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."}, { "name": "Host", "type": "string", "versions": "9+", "about": "The node's hostname." }, { "name": "Port", "type": "int32", "versions": "9+", "about": "The node's port." }, { "name": "Rack", "type": "string", "versions": "9+", "nullableVersions": "9+", "ignorable": true, "default": "null", "about": "The rack of the node, or null if it has not been assigned to a rack." } ]} // ---------- End new field ---------- ] } |
Benchmark Results
These are the benchmark results for leader discovery optimization. Performance was tested on low partition and high partition workloads, more details on the setup are under Workload Details. We see up to 5% improvement in E2E latencies when run with acks=all and up to 9% improvement in produce latencies when run with acks=1. Our hypothesis for why the improvement when run with acks=1 is higher than acks=all is that metadata convergence delays for partition movement on the server side during software upgrades are higher than the client side redirection in the KIP which impacts ack=all requests more than ack=1. We believe this is also the reason why low partitions workload shows better improvements in ack=1 than high partitions workload. The results are averaged over 2 runs.
Latency improvement of workloads run with acks=all
Baseline latency (ms) | Optimized latency (ms) | Improvement | |
---|---|---|---|
High Partitions | |||
p99 E2E | 188 | 184 | 2.1% |
p99 Produce | 155.65 | 151.8 | 2.5% |
Low Partitions | |||
p99 E2E | 393 | 374.5 | 4.7% |
p99 Produce | 390.95 | 374.35 | 4.2% |
Latency improvement of workloads run with acks=1
Baseline latency (ms) | Optimized latency (ms) | Improvement | |
---|---|---|---|
High Partitions | |||
p99 E2E | 106.5 | 101 | 5.2% |
p99 Produce | 84.7 | 83.3 | 1.7% |
Low Partitions | |||
p99 E2E | 12.5 | 12.5 | 0% |
p99 Produce | 3.25 | 2.95 | 9.2% |
Workload Details
All tests are run on 6 m5.xlarge Apache Kafka brokers running with Kraft as the metadata quorum in 3 m5.xlarge instances. The clients are 6 m5.xlarge instances running the OpenMessagingBenchmark. The test is run for 70 minutes, during which the brokers are restarted one by one with a 10 minute interval between restarts.
High partitions workload parameters:
One 9,000 partition topic
~12 MB/s ingress
512b message size
23,400 messages/s distributed over 60 producers
~36 MB/s egress
3 subscriptions, 60 consumers per subscription
Low partitions workload parameters:
One 108 partition topic
~120 MB/s ingress
512b message size
234,000 messages/s distributed over 6 producers
~360 MB/s egress
3 subscriptions, 6 consumers per subscription
Compatibility, Deprecation, and Migration Plan
...