Table of Contents |
---|
Status
Current state: Vote in progress Accepted (vote thread)
Discussion thread: here
...
There is one new method exposed on the partitioner interface. There will also be one new class.
Code Block | ||
---|---|---|
| ||
/** * Computes a partition and* otherExecutes relevant data. Knows whetherright before a new batch will be created. forFor the record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null example, if a sticky partitioner is used, * this method can change the chosen sticky partition for the new batch. * @param topic The topic name * @param cluster The current cluster metadata * @param isNewBatch A boolean to specify whether a new batch will be created * @return@param ComputedPartition object to specify details about the record's partition */ default ComputedPartition computePartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, boolean isNewBatch) { return new ComputedPartition(partition(topic, key, keyBytes, value, valueBytes, cluster)); } |
Code Block | ||
---|---|---|
| ||
/**
* Stores both the partition and whether computePartition should be called when a new batch
* is about to be created.
*/
class ComputedPartition{
int part;
boolean willCallOnNewBatch;
public ComputedPartition(int part) {
this.part = part;
this.willCallOnNewBatch = false;
}
public ComputedPartition(int part, boolean willCallOnNewBatch) {
this.part = part;
this.willCallOnNewBatch = willCallOnNewBatch;
}
public boolean willCallOnNewBatch() { return this.willCallOnNewBatch; }
public int partition() { return this.part; }
}
|
The method computePartition will replace partition in the KafkaProducer. It will return a computePartition object that contains both the partition and a boolean that determines whether we call the method again exactly once for a record if that record would have created a new batch.
prevPartition The partition of the batch that was just completed
*/
default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
|
The method onNewBatch will execute code right before a new batch is created. The sticky partitioner will define this method to update the sticky partition. This includes changing the sticky partition even when there will be a new batch on a keyed value. Test results show that this change will not significantly affect latency in the keyed value caseThe sticky partitioner will define this method so that when we have non-keyed, no explicit partition-id records, willCallOnNewBatch will be true and we will update the sticky partition when there is a new batch. All other cases will have the expected behavior.
The default of this method will result in no change to the current partitioning behavior for other user-defined partitioners. If a user wants to implement a sticky partitioner in their own partitioner class, this method can be overridden.
...
Change the behavior of the default partitioner in the no explicit partition, key = null case. Choose the “sticky” partition for the given topic. The “sticky” partition is changed when the record accumulator is allocating a new batch for a topic on a given partition.
These changes will slightly modify the code path for records that have keys as well, but the changes will not affect latency significantly.
A new partitioner called UniformStickyPartitioner will be created to allow sticky partitioning on all records, even those that have non-null keys. This will mirror how the RoundRobinPartitioner uses the round robin partitioning strategy for all records, including those with keys.
...
EC2 Instance | m3.xlarge |
Disk Type | SSD |
Duration of Test | 12 minutes |
Number of Brokers | 3 |
Number of Producers | 3 |
Replication Factor | 3 |
Active Topics | 4 |
Inactive Topics | 1 |
Linger.ms | 0 |
Acks | All |
keyGenerator | {"type": "null"} |
manualPartitionuseConfiguredPartitioner | True |
No Flushing on Throttle (skipFlush) | True |
...