Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • DefaultPartitioner  and UniformStickyPartitioner will be deprecated, they'll behave the same way as they are today.
  • Users that don't specify a custom partitioner would get the new behavior automatically.
  • Users that explicitly specify DefaultPartitioner  or UniformStickyPartitioner would get a deprecation warning, but see no change of behavior.  They would need to update the configuration correspondingly to get the updated behavior (remove partitioner.class setting and optionally set partitioner.ignore.keys to 'true' if replacing UniformStickyPartitioner).
  • Partitioner.onNewBatch will be deprecated.

Test Results

Test Setup

3 kafka brokers on a local machine, one topic with 3 partitions, RF=1, one partition on each broker:

    Topic: foo    Partition: 0    Leader: 1    Replicas: 1    Isr: 1    Offline: 
    Topic: foo    Partition: 1    Leader: 0    Replicas: 0    Isr: 0    Offline: 
    Topic: foo    Partition: 2    Leader: 2    Replicas: 2    Isr: 2    Offline: 

Kafka-0 has 20ms sleep injected for each produce response.

The settings of Kafka producer are default other than the explicitly mentioned.

Test Series 1

bin/kafka-producer-perf-test.sh --topic foo --num-records 122880 --print-metrics --record-size 512 --throughput 2048 --producer.config producer.properties 

The test produces ~120K records 512 bytes each (total of ~60MB) with the throughput throttle of 2048 rec/sec (1MB/sec)

Summary

Partitioner ConfigThroughputAvg LatencyP99 LatencyP99.9 Latency
Old DefaultPartitioner0.84 MB/s4072.74 ms10992 ms11214 ms
partitioner.adaptive.partitioning.enable=false1 MB/s49.92 ms 214 ms422 ms
New logic with all default settings1 MB/s40.06 ms154 ms220 ms
partitioner.availability.timeout.ms=51 MB/s36.29 ms150 ms184 ms

Old DefaultPartitioner (current implementation)

122880 records sent, 1724.873666 records/sec (0.84 MB/sec), 4072.74 ms avg latency, 11246.00 ms max latency, 3870 ms 50th, 10221 ms 95th, 10992 ms 99th, 11214 ms 99.9th.

The current implementation favors the slowest broker and manages to handle ~0.85 MB/s, so the latency grows over time.

producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0}  : 46826262.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1}  : 9207276.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2}  : 9004193.000

The Kafka-0 broker takes ~5x more bytes than the other 2 brokers, becoming the bottleneck for the cluster and potentially skewing downstream data distribution.

New Uniform Partitioner (partitioner.adaptive.partitioning.enable=false)

122880 records sent, 2043.198484 records/sec (1.00 MB/sec), 49.92 ms avg latency, 795.00 ms max latency, 8 ms 50th, 150 ms 95th, 214 ms 99th, 422 ms 99.9th.
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0}  : 22237614.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1}  : 21606034.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2}  : 22152273.000

All brokers take roughly the same load.  The slowest broker isn't overloaded anymore, so the clusters is pulling 1MB/sec and latencies are more under control.

New Default Partitioner (all settings are default)

122880 records sent, 2045.477245 records/sec (1.00 MB/sec), 40.06 ms avg latency, 817.00 ms max latency, 7 ms 50th, 141 ms 95th, 154 ms 99th, 220 ms 99.9th.

producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0}  : 19244362.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1}  : 23589010.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2}  : 23321818.000

Here the adaptive logic sends less data to the slower broker, so faster brokers can take more load and the latencies are better.  The data distribution isn't skewed too much, just enough to adjust load to broker capacity.

New Default Partitioner with partitioner.availability.timeout.ms=5

122880 records sent, 2044.218196 records/sec (1.00 MB/sec), 36.29 ms avg latency, 809.00 ms max latency, 6 ms 50th, 138 ms 95th, 150 ms 99th, 184 ms 99.9th.

producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0}  : 17431944.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1}  : 25781879.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2}  : 22977244.000

Here adaptive logic is more responsive to latency and sends even less data to the slower broker, which increases the mix of data processed by faster brokers.  Note that it took me a few experiments to find a good value that made a difference, so this confirms the design decision that this logic should be off by default an only turned on after tuning to a specific configuration and workload.

Test Series 2

In this test series, the throughput throttle is increased to 2MB/sec.

bin/kafka-producer-perf-test.sh --topic foo --num-records 122880 --print-metrics --record-size 512 --throughput 4096 --producer.config producer.properties

The test produces ~120K records 512 bytes each (total of ~60MB) with the throughput throttle of 4096 rec/sec (2MB/sec)

New Uniform Partitioner (partitioner.adaptive.partitioning.enable=false)

122880 records sent, 3789.317873 records/sec (1.85 MB/sec), 426.24 ms avg latency, 2506.00 ms max latency, 8 ms 50th, 2065 ms 95th, 2408 ms 99th, 2468 ms 99.9th.

producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0}  : 22396882.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1}  : 21652393.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2}  : 21355134.000

Here the brokers take the same load, looks like the slowest broker is maxed out, so the cluster can only take 1.85MB/s and latencies grow.

New Default Partitioner (all settings are default)

122880 records sent, 4078.327249 records/sec (1.99 MB/sec), 34.66 ms avg latency, 785.00 ms max latency, 4 ms 50th, 143 ms 95th, 167 ms 99th, 297 ms 99.9th.

producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0}  : 14866064.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1}  : 25176418.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2}  : 25581116.000

Adaptive logic manages to redistribute the load to faster brokers, to sustain the 2MB/sec throughput and the latencies are stable.  Note that here a bigger skew in the distribution (vs. 1MB/sec throttle)  was made to adjust load to broker capacity, showing that adaptive logic does just enough skew to keep latencies stable.

Rejected Alternatives

KafkaProducer to Explicitly Check for DefaultPartitioner

...