Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Vote in progress Accepted (vote thread)

Discussion thread: here

...

There is one new method exposed on the partitioner interface. There will also be one new class. 


Code Block
languagejava
   /**
 * Computes a partition* andExecutes otherright relevant data. Knows whether before a new batch will be created. forFor the record.
 * 
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes The serialized key to partition on( or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes The serialized value to partition on or null
 example, if a sticky partitioner is used,
   * this method can change the chosen sticky partition for the new batch. 
   * @param topic The topic name
   * @param cluster The current cluster metadata
 * @param isNewBatch A boolean to specify whether a new batch will be created
 * @return@param ComputedPartition object to specify details about the record's partition
 */
default ComputedPartition computePartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, boolean isNewBatch) {
    return new ComputedPartition(partition(topic, key, keyBytes, value, valueBytes, cluster));
}
Code Block
languagejava
    /**
     * Stores both the partition and whether computePartition should be called when a new batch
     * is about to be created.
  prevPartition The partition of the batch that was just completed
   */
    class ComputedPartition{
        int part;
        boolean willCallOnNewBatch;
        
        public ComputedPartition(int part) {
             this.part = part;
             this.willCallOnNewBatch = false;
        }
        
        public ComputedPartition(int part, boolean willCallOnNewBatch) {
             this.part = part;
             this.willCallOnNewBatch = willCallOnNewBatch;
        }
        
        public boolean willCallOnNewBatch() { return this.willCallOnNewBatch; }
        public int partition() { return this.part; }
    }
 

The method computePartition will replace partition in the KafkaProducer. It will return a computePartition object that contains both the partition and a boolean that determines whether we call the method again exactly once for a record if that record would have created a new batch.

default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
  }


The method onNewBatch will execute code right before a new batch is created. The sticky partitioner will define this method so that when we have non-keyed, no explicit partition-id records, willCallOnNewBatch will be true and we will update to update the sticky partition. This includes changing the sticky partition even when there is will be a new batch . All other cases will have the expected behavioron a keyed value. Test results show that this change will not significantly affect latency in the keyed value case.

The default of this method will result in no change to the current partitioning behavior for other user-defined partitioners. If a user wants to implement a sticky partitioner in their own partitioner class, this method can be overridden.Attempting to call the partition method in the DefaultPartitioner will throw an error, as this method will be deprecated and no longer return a partition. computePartition has replaced its functionality. 

Proposed Changes

Change the behavior of the default partitioner in the no explicit partition, key = null case. Choose the “sticky” partition for the given topic. The “sticky” partition is changed when the record accumulator is allocating a new batch for a topic on a given partition.If the DefaultPartitioner's partition method is called, it will no longer return the partition and instead throw an error

These changes will slightly modify the code path for records that have keys as well, but the changes will not affect latency significantly.

A new partitioner called StickyRoundRobinPartitioner UniformStickyPartitioner will be created to allow sticky partitioning on all records, even those that have non-null keys. This will mirror how the RoundRobinPartitioner uses the round robin partitioning strategy for all records, including those with keys.

...

EC2 Instance

m3.xlarge

Disk Type

SSD

Duration of Test

12 minutes

Number of Brokers

3

Number of Producers

3

Replication Factor

3

Active Topics

4

Inactive Topics

1

Linger.ms

0

Acks

All

keyGenerator

{"type": "null"}

manualPartitionuseConfiguredPartitioner

True

No Flushing on Throttle (skipFlush)

True

...