Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The Partitioner.partition method can now return -1 to indicate that a default partitioning decision should be made by the producer itself.  Previously, Partitioner.partition was required to return a valid partition number.  This departs from the paradigm that partitioning logic (including the default partitioning logic) is fully encapsulated in a partitioner object, this encapsulation doesn't work well anymore as it requires information that only producer (Sender, RecordAccumulator) can know (such as queue sizes, record sizes, broker responsiveness, etc.).  See the rejected alternatives section for an attempt to preserve encapsulation of default partitioning logic within a paritioner object.

When the producer gets -1 from the partitioner, it calculates the partition itself.  This way the custom paritioner logic can continue to work and the producer would use the partition that is returned from the paritioner, however if the paritioner just wants to use default partitioning logic, it can return -1 and let the producer figure out the partition to use.

This also seems to be more future proof than trying to preserve (partial) encapsulation of partitioning logic within default partitioner, because if in the future we support additional signals, we can just change the logic in the producer and don't need to extend the partitioner interface to pass additional info.

New Configuration

partitioner.sticky.batch.size.  The default would be 0, in which case the batch.size would be used (we can change the default to batch.max.size once KIP-782 is implemented).  See the explanation in the Uniform Sticky Batch Size section.

enable.adaptive.partitioning.  The default would be 'true', if it's true then the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers.  If it's 'false', then the producer will try to distribute messages uniformly.

...

  • No compatibility, deprecation, migration plan.  This fixes a problem with current implementation.
  • Users can continue to use their own partitioners--if they want to implement a partitioner that switches partitions based on batch creations, they can use the onNewBatch(String topic, Cluster cluster) method to implement the feature.

Rejected Alternatives

As an alternative to allowing to return -1 from the Partitioner.partition method to indicate that the producer should execute default partitioning logic, it was considered to provide a callback interface that could feed information from producer back to the partitioner, as the following:

Code Block
languagejava
public interface Partitioner extends Configurable, Closeable {

    /**
     * Callbacks from the producer
     */
    interface Callbacks {
       /**
         * Get record size in bytes.  The keyBytes and valueBytes may present skewed view of the number
         * of bytes produced to the partition.  In addition, the callback takes into account the following:
         *  1. Headers
         *  2. Record overhead
         *  3. Batch overhead
         *  4. Compression
         *
         * @param partition The partition we need the record size for
         * @return The record size in bytes
         */
       int getRecordSize(int partition);

        /**
         * Calculate the partition number.  The producer keeps stats on partition load
         * and can use it as a signal for picking up the next partition.
         *
         * @return The partition number, or -1 if not implemented or not known
         */
       default int nextPartition() {
           return -1;
       }
    }

   // ... <skip> ...

   /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param callbacks The record size and partition callbacks (see {@link Partitioner#Callbacks})
     * @param cluster The current cluster metadata
     */
    default int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                          Callbacks callbacks, Cluster cluster) {
        return partition(topic, key, keyBytes, value, valueBytes, cluster);
    }

   // ... <skip> ...
}

The getRecordSize callback method is needed to calculate the number of bytes in the record, the current information is not enough to calculate it accurately.  It doesn't have to be 100% precise, but it needs to avoid systemic errors that could lead to skews over the long run (it's ok if, say, compression rate was a little bit off for one batch, it'll converge over the long run) and it should roughly match the batch size (e.g. we have to apply compression estimates if compression is used, otherwise we'll systemically switch partition before batch is full).  See also the comments in the code snippet.

The nextPartition callback method effectively delegates partition switching logic back to producer.

This was an attempt to preserve the role separation between core producer logic and partitioner logic, but in reality it led to complicated interface (hard to understand the purpose without digging into implementation specifics and not really useful for other custom producers) and the logic that is logically tightly coupled (hard to understand partitioner logic without understanding producer logic and vice versa) but physically split between partitioner and core producer.

After doing that we realized that the desired encapsulation of default partitioning logic within default partitioner was broken anyway, so we might as well hoist the default partitioning logic into producer and let the default partitioner just inform the producer that the default partitioning logic is desired.  Hoisting the logic into producer was also slightly more efficient, as the split logic required multiple lookups into various maps as it transitioned between producer and partitioner, now (with returning -1) lookup is made once and the logic runs in one go.