Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted (vote thread)

Discussion thread: here

JIRA: here

...

The sticky partitioner attempts to create this behavior in the partitioner. By “sticking” to a partition until the batch is full (or is sent when linger.ms is up), we can create larger batches and reduce latency in the system compared to the default partitioner. Even in the case where linger.ms is 0 and we send right away, we see improved batching and a decrease in latency. After sending a batch, the partition that is sticky changes. Over time, the records should be spread out evenly among all the partitions.

Netflix had a similar idea and created a sticky partitioner that selects a partition and sends all records to it for a given period of time before switching to a new partition.

...

The sticky partitioner will be part of the default partitioner, so there will not be a public interface directly.

There are two is one new methods method exposed on the partitioner interface.


Code Block
languagejava
    /**
   * Executes *right Runs before thea new batch iswill be created. ifFor willCallOnNewBatchexample, returnsif true.
a sticky partitioner is  * 
  used,
   * @paramthis topicmethod Thecan topicchange name
the chosen sticky   * @param keyBytes The serialized key of the record ( or null if no key)
  partition for the new batch. 
   * @param valueBytestopic The serialized value of the record or null
  topic name
   * @param cluster The current cluster metadata
     */
    default void onNewBatch(String topic, byte[] keyBytes, byte[] valueBytes, Cluster cluster) {
        return;
    }
    
    /**
     * Determines whether to call onNewBatch based on the key, value, and whether there is an explicit partition
     *
     * @param keyBytes The serialized key@param prevPartition The partition of the recordbatch (that orwas null if no key)
     * @param valueBytes The serialized value of the record or null
 just completed
    * @param explicitPartition A boolean to represent whether an explicit partition id has been provided
     */
  default public defaultvoid boolean willCallOnNewBatch(byte[] keyBytes, byte[] valueBytes, boolean explicitPartitiononNewBatch(String topic, Cluster cluster, int prevPartition) {
        return false;
    }

When adding a record and a new record batch is created, the result of willCallOnNewBatch will determine whether onNewBatch is called. onNewBatch is called before the new batch is created, so the partition can be changed before the batch is made.

The sticky partitioner will use these methods to call onNewBatch for records with no keys and no explicit partitions, and switch the sticky partition on the new batch.

}


The method onNewBatch will execute code right before a new batch is created. The sticky partitioner will define this method to update the sticky partition. This includes changing the sticky partition even when there will be a new batch on a keyed value. Test results show that this change will not significantly affect latency in the keyed value case.

The default of this method The default will result in no change to the current partitioning behavior for other user-defined partitioners. If a user wants to implement a sticky partitioner in their own partitioner class, these this method can be overridden.

...

Change the behavior of the default partitioner in the no explicit partition, key = null case. Choose the “sticky” partition for the given topic. The “sticky” partition is changed when the record accumulator is allocating a new batch for a topic on a given partition.

These changes will slightly modify the code path for records that have keys as well, but the changes will not affect latency significantly.

A new partitioner called UniformStickyPartitioner will be created to allow sticky partitioning on all records, even those that have non-null keys. This will mirror how the RoundRobinPartitioner uses the round robin partitioning strategy for all records, including those with keys.

Compatibility, Deprecation, and Migration Plan

...

EC2 Instance

m3.xlarge

Disk Type

SSD

Duration of Test

12 minutes

Number of Brokers

3

Number of Producers

3

Replication Factor

3

Active Topics

4

Inactive Topics

1

Linger.ms

0

Acks

All

keyGenerator

{"type": "null"}

manualPartitionuseConfiguredPartitioner

True

No Flushing on Throttle (skipFlush)

True

...