Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • A single Metadata RPC can be slow (order of few 100 milliseconds in extreme cases).

    • Digging deeper into this we found that head of the line blocking from slow produce requests preventing responses from being sent is the main source of these slow metadata RPC's.
  • It might take multiple attempts to refresh the metadata if the latest metadata hasn’t yet propagated throughout the cluster.

  • The produce and fetch requests have their own delayed retries on failed attempts (default is 100ms in Java client), to avoid overloading the Kafka brokers with failing RPCs.

...

When the leader of a partition changes from the old leader to a new leader, the old leader can inform the client of the new leader’s LeaderEpoch & LeaderId via ProduceResponse and FetchResponse if it has that information. Notably, this information is sent together with the existing error codes of NOT_LEADER_OR_FOLLOWER and FENCED_LEADER_EPOCH. The new leader information is obtained from either the replica partition state (if the receiving broker continues to be a replica of the partition) or from the broker’s metadata cache (if the receiving broker is not a replica for the partition because of reassignment). These new leader fields will be optional (tagged), if the old leader does not have this information for any reason it does not need to populate them and the client would do a full metadata refresh.

Client

The client will only accept the new leader information(LeaderId & LeaderEpoch) only if it advances its view of the new leader(i.e. new-leader’s Epoch should be greater than what client knows already) and use it in subsequent retries of the Produce & Fetch requests. On the client, it can happen that the subsequent metadata refreshes return stale leader information, if the latest metadata isn’t yet fully propagated to the entire cluster. The client will make sure that new leader information isn’t overridden by the stale leader’s information(again comparing LeaderEpochs), which is the existing behaviour of Kafka Java client.

...

For Produce, if new leader info is available in the response along with errors(NOT_LEADER_OR_FOLLOWER or FENCED_LEADER_EPOCH) that advances client's view of the leadership, client would no longer back off up to RETRY_BACKOFF_MS_CONFIG before retrying the failed batch. This immediate retry is appealing as the client is going to retry on a different broker and it is likely to succeed because it is retrying on a newer leader. On the other hand, subsequent retries to the same new LeaderEpoch should LeaderEpoch should still continue to be subject to clients' backoff strategy.

...

These are the benchmark results for leader discovery optimization. Two sets of tests were performed, a shorter micro-benchmark targeting redirection more directly and a longer running benchmark simulating a roll.

Micro Benchmark

Performance was tested using the kafka-producer-perf-test.sh script and reassigning leadership of all partitions of a 100 partition topic. We see an end-to-end reduction in the p99.9 produce latency of the overall run of 88%, from 1675ms to 215ms (average of 3 runs). We hypothesize the residual latency is due to metadata convergence on the servers, this is evident in the results for the rejected alternative, which performs a full metadata refresh but eliminates the retry backoff to the new leader. This experiment showed an average latency of 3022ms which is higher than the baseline, we hypothesize this is due to the high variance server side convergence introduces to metadata latency.

Baseline

Run 1: 40000000 records sent, 99997.750051 records/sec (95.37 MB/sec), 12.56 ms avg latency, 8087.00 ms max latency, 6 ms 50th, 8 ms 95th, 12 ms 99th, 2967 ms 99.9th.
Run 2: 40000000 records sent, 99998.250031 records/sec (95.37 MB/sec), 15.51 ms avg latency, 11652.00 ms max latency, 6 ms 50th, 8 ms 95th, 13 ms 99th, 859 ms 99.9th.
Run 3: 40000000 records sent, 99998.000040 records/sec (95.37 MB/sec), 8.63 ms avg latency, 3224.00 ms max latency, 6 ms 50th, 8 ms 95th, 14 ms 99th, 1201 ms 99.9th.

KIP-951

Run 1: 40000000 records sent, 99998.000040 records/sec (95.37 MB/sec), 8.51 ms avg latency, 2949.00 ms max latency, 6 ms 50th, 8 ms 95th, 15 ms 99th, 346 ms 99.9th.
Run 2: 40000000 records sent, 99998.000040 records/sec (95.37 MB/sec), 15.11 ms avg latency, 11118.00 ms max latency, 6 ms 50th, 8 ms 95th, 12 ms 99th, 174 ms 99.9th.
Run 3: 40000000 records sent, 99997.500062 records/sec (95.37 MB/sec), 11.71 ms avg latency, 6933.00 ms max latency, 6 ms 50th, 8 ms 95th, 15 ms 99th, 125 ms 99.9th.

Rejected Alternative

Run 1: 40000000 records sent, 99997.500062 records/sec (95.37 MB/sec), 9.77 ms avg latency, 6756.00 ms max latency, 6 ms 50th, 8 ms 95th, 14 ms 99th, 1781 ms 99.9th.
Run 2: 40000000 records sent, 99997.750051 records/sec (95.37 MB/sec), 11.07 ms avg latency, 7409.00 ms max latency, 5 ms 50th, 7 ms 95th, 11 ms 99th, 1934 ms 99.9th.
Run 3: 40000000 records sent, 99997.750051 records/sec (95.37 MB/sec), 16.26 ms avg latency, 14211.00 ms max latency, 6 ms 50th, 9 ms 95th, 16 ms 99th, 5352 ms 99.9th.

Workload Details

All tests are run on 6 m5.xlarge Apache Kafka brokers running with Kraft as the metadata quorum in 3 m5.xlarge instances. The client is a m5.xlarge instance running the kafka-producer-perf-test.sh script with the following parameters. The test lasts for around 6 minutes, during which all partitions of the 100 partition test topic are reassigned by rotating the replica set from the previous leader to the next in-line replica.

./bin/kafka-producer-perf-test.sh --producer.config ... --throughput 100000 --record-size 1000  --num-records 40000000 --topic ...  --producer-props acks=all linger.ms=0 batch.size=16384 --print-metrics

Roll Simulation

Performance was tested Performance was tested on low partition and high partition workloads, more details on the setup are under Workload Details. We see up to 5% improvement in E2E latencies when run with acks=all and up to 9% improvement in produce latencies when run with acks=1. Our hypothesis for why the improvement when run with acks=1 is higher than acks=all is that metadata convergence delays for partition movement on the server side during software upgrades are higher than the client side redirection in the KIP which impacts ack=all requests more than ack=1. We believe this is also the reason why low partitions workload shows better improvements in ack=1 than high partitions workload. The results are averaged over 2 runs.

...


Baseline latency (ms)
avg (run1, run2)

Optimized latency (ms)
avg (run1, run2)

Improvement

High Partitions

p99 E2E

106.5 (111, 102)

101 (104, 98)

5.2%

p99 Produce

84.7 (85.8, 83.6)

83.3 (82.5, 84.1)

1.7%

Low Partitions

p99 E2E

12.5 (13, 12)

12.5 (11, 14)

0%

p99 Produce

3.25 (3.3, 3.2)

2.95 (3, 2.9)

9.2%

Workload Details

All tests are run on 6 m5.xlarge Apache Kafka brokers running with Kraft as the metadata quorum in 3 m5.xlarge instances. The clients are 6 m5.xlarge instances running the OpenMessagingBenchmark. The test is run for 70 minutes, during which the brokers are restarted one by one with a 10 minute interval between restarts.

...

  1. Total Time for alternative = Produce RPC(client to old leader) + Time taken to refresh metadata to get new eader leader + Produce RPC(client to new leader)
  2. Total Time for the favored proposed changes = Produce RPC(client to old leader, ProduceResponse has new leader) +  Produce RPC(client to new leader)

It can be clearly seen alternative has an extra-component, i.e.  time taken to refresh metadata to get new eaderleader. This time has a lower bound of 1 single Metadata RPC call, but degrades to many such calls if metadata propagation is slower through the cluster. Due to this, proposed changes, is the preferred alternative.