Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

So in some sense, the UniformStickyPartitioner is neither uniform nor sufficiently sticky.

Public Interfaces

There is one new method exposed on the partitioner interface.  The new method takes an additional callback that can be used to calculate an accurate record size in bytes.

Code Block
languagejava
   /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param getRecordSizeCallback The callback that could be used to calculate record size for a partition
     * @param cluster The current cluster metadata
     */
    default int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                          Function<Integer, Integer> getRecordSizeCallback, Cluster cluster) {
        return partition(topic, key, keyBytes, value, valueBytes, cluster);
    }

org.apache.kafka.clients.producer.Partitioner

The Partitioner.partition method can now return -1 to indicate that a default partitioning decision should be made by the producer itself.  Previously, Partitioner.partition was required to return a valid partition number.

New Configuration

A new producer config setting would be added: partitioner.sticky.batch.size.  The default would be 0, in which case the batch.size would be used (we can change the default to batch.max.size once KIP-782 is implemented).

enable.adaptive.partitioning.  The default would be 'true', if it's true then the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers.  If it's 'false', then the producer will try to distribute messages uniformly.

partition.availability.timeout.ms.  The default would be 0.  If the value is greater than 0 and adaptive partitioning is enabled, and the broker cannot accept a produce request to the partition for partition.availability.timeout.ms milliseconds, the partition is marked as not available.  If the value is 0, this logic is disabled.

Proposed Changes

Uniform Sticky Batch Size

Instead of switching parititions partitions on every batch creation, switch partitions every time partitioner.sticky.batch.size bytes got produced to partition.  Say we're producing to partition 1.  After 16KB got produced to partition 1, we switch to partition 42.  After 16KB got produced to partition 42, we switch to partition 3.  And so on.  We do it regardless of what happens with batching or etc. just count the bytes produced to a partition.  This way the distribution would be both uniform (there could be small temporary imbalance) and sticky even if linger.ms=0 because more consecutive records are directed to a partition, allowing it to create better batches.

...

  1. It's uniform, which is simple to implement and easy to understand.  Intuitively, this is what users expect.

  2. It creates better batches, without adding linger latency on low production rate but switching to better batching on high production rate.

  3. It adapts to higher latency brokers, using larger batches to push data, keeping throughput and data distribution uniform.

  4. It's efficient (logic for selecting partitions doesn't require complex calculations).

Adaptive Partition Switching

One potential disadvantage of strictly uniform partition switching is that if one of the brokers is lagging behind (cannot sustain its share of throughput), the records will keep piling in the accumulator, and will eventually exhaust the buffer pool memory and slow down the production rate to match the capacity of the slowest broker.  To avoid this problem, the partition switch decision can adapt to broker load.

The queue size of batches waiting to be sent is a direct indication of broker load (more loaded brokers would have longer queue).  Partition switching taking into account the queue sizes when choosing next partition.  The probability of choosing a partition is proportional to the inverse of queue size (i.e. partitions with longer queues are less likely to be chosen).

In addition to queue size - based logic, partition.availability.timeout.ms can set to a non-0 value, in which case partitions that have batches ready to be sent for more than partition.availability.timeout.ms milliseconds, would be marked as not available for partitioning and would not be chosen until the broker is able to accept the next ready batch from the partition.

Adaptive partition switching can be turned off by setting enable.adaptive.partitioning = false.

Note that these changes do not affect partitioning for keyed messages, only partitioning for unkeyed messages.

Compatibility, Deprecation, and Migration Plan

  • No compatibility, deprecation, migration plan.  This fixes a problem with current implementation.
  • Users can continue to use their own partitioners--if they want to implement a partitioner that switches partitions based on batch creations, they can use the onNewBatch(String topic, Cluster cluster) method to implement the feature.

Rejected Alternatives

...

  • .