Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add benchmark #s

...

Our goal is to minimize the time taken to discover a new leader, and use that information as soon as possible on the client. This can be beneficial in a number of situations such as cluster rolls, reassignments, or preferred leader election.

Note, performance tests are being done to show the improvements with the proposed changes. Results to follow.

Proposed Changes

This KIP proposes to make new leader information available in the ProduceResponse and FetchResponse when a newer leader is known by the receiving broker. This should help us eliminate the metadata refresh from the critical path of the Produce and Fetch requests, which can be very latency-sensitive. This is an optimization mentioned in KIP-595, which uses a similar technique but only for the metadata log and the Fetch RPC.

...

Code Block
{
  "apiKey": 0,
  "type": "response",
  "name": "ProduceResponse",
  "validVersions": "0-9",
  "flexibleVersions": "9+",
  "fields": [
    { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
      "about": "Each produce response", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
        "about": "The topic name" },
      { "name": "PartitionResponses", "type": "[]PartitionProduceResponse", "versions": "0+",
        "about": "Each partition that we produced to within the topic.", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "BaseOffset", "type": "int64", "versions": "0+",
          "about": "The base offset." },
        { "name": "LogAppendTimeMs", "type": "int64", "versions": "2+", "default": "-1", "ignorable": true,
          "about": "The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1.  If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended." },
        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The log start offset." },
        { "name": "RecordErrors", "type": "[]BatchIndexAndErrorMessage", "versions": "8+", "ignorable": true,
          "about": "The batch indices of records that caused the batch to be dropped", "fields": [
          { "name": "BatchIndex", "type": "int32", "versions":  "8+",
            "about": "The batch index of the record that cause the batch to be dropped" },
          { "name": "BatchIndexErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+",
            "about": "The error message of the record that caused the batch to be dropped"}
        ]},
        { "name":  "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable":  true,
          "about":  "The global error message summarizing the common root cause of the records that caused the batch to be dropped"},
		// ---------- Start new field ----------
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "9+", "taggedVersions": "9+", "tag": 0, "fields": [
            { "name": "LeaderId", "type": "int32", "versions": "9+", "default": "-1", "entityType": "brokerId",
              "about": "The ID of the current leader or -1 if the leader is unknown."},
            { "name": "LeaderEpoch", "type": "int32", "versions": "9+", "default": "-1",
              "about": "The latest known leader epoch"}
        ]}
		// ---------- End new field ---------- 
      ]}
    ]},
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "default": "0",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
	// ---------- Start new field ---------- 
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "9+", "taggedVersions": "12+", "tag": 0,
      "about": "Endpoints for all current-leaders enumerated in PartitionData.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "9+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
      { "name": "Host", "type": "string", "versions": "9+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "9+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "9+", "nullableVersions": "9+", "ignorable": true, "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
	// ---------- End new field ----------  
  ]
}


Benchmark Results

These are the benchmark results for leader discovery optimization. Performance was tested on low partition and high partition workloads, more details on the setup are under Workload Details. We see up to 5% improvement in E2E latencies when run with acks=all and up to 9% improvement in produce latencies when run with acks=1. Our hypothesis for why the improvement when run with acks=1 is higher than acks=all is that metadata convergence delays for partition movement on the server side during software upgrades are higher than the client side redirection in the KIP which impacts ack=all requests more than ack=1. We believe this is also the reason why low partitions workload shows better improvements in ack=1 than high partitions workload. The results are averaged over 2 runs.

Latency improvement of workloads run with acks=all


Baseline latency (ms)

Optimized latency (ms)

Improvement

High Partitions

p99 E2E

188

184

2.1%

p99 Produce

155.65

151.8

2.5%

Low Partitions

p99 E2E

393

374.5

4.7%

p99 Produce

390.95

374.35

4.2%

Latency improvement of workloads run with acks=1


Baseline latency (ms)

Optimized latency (ms)

Improvement

High Partitions

p99 E2E

106.5

101

5.2%

p99 Produce

84.7

83.3

1.7%

Low Partitions

p99 E2E

12.5

12.5

0%

p99 Produce

3.25

2.95

9.2%

Workload Details

All tests are run on 6 m5.xlarge Apache Kafka brokers running with Kraft as the metadata quorum in 3 m5.xlarge instances. The clients are 6 m5.xlarge instances running the OpenMessagingBenchmark. The test is run for 70 minutes, during which the brokers are restarted one by one with a 10 minute interval between restarts.

High partitions workload parameters:

  • One 9,000 partition topic

  • ~12 MB/s ingress

    • 512b message size

    • 23,400 messages/s distributed over 60 producers

  • ~36 MB/s egress

    • 3 subscriptions, 60 consumers per subscription

Low partitions workload parameters:

  • One 108 partition topic

  • ~120 MB/s ingress

    • 512b message size

    • 234,000 messages/s distributed over 6 producers

  • ~360 MB/s egress

    • 3 subscriptions, 6 consumers per subscription

Compatibility, Deprecation, and Migration Plan

...