Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

So in some sense, the UniformStickyPartitioner is neither uniform nor sufficiently sticky.

Public Interfaces

...

Changed Default Configuration

enable.adaptive.partitioning.  The default would be 'true', if it's true then the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers.  If it's 'false', then the producer will try to distribute messages uniformly.

partitioner.class would have the default value null.  When partitioner.class isn't explicitly to set to a custom partitioner class, the producer uses partitioning logic implemented in KafkaProducer .  The DefaultPartitioner  and UniformStickyPartitioner are deprecated, instead of setting  partitioner.class=UniformStickyPartitioner, the partitioner.class shouldn't be set and partitioner.ignore.keys configuration should be set to 'true'.

New Configuration

partitioner.adaptive.partitioning.enable.  The default would be 'true', if it's true then the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers.  If it's 'false', then the producer will try to assign partitions randomly.

partitioner.availability.timeout.ms.  The default would be partition.availability.timeout.ms.  The default would be 0.  If the value is greater than 0 and adaptive partitioning is enabled, and the broker cannot accept a produce request to the partition for partitionpartitioner.availability.timeout.ms milliseconds, the partition is marked as not available.  If the value is 0, this logic is disabled.Note that the new configuration applies to the partitioning algorithm that is used when DefaultPartitioner  or UniformStickyPartitioner  is selected.

Proposed Changes

This configuration is the default partitioning logic for messages without keys, that substitutes the corresponding logic implemented in the default partitioner.  The default partitioning logic is going to be implemented in the KafkaProducer itself.  KafkaProducer  would explicitly check if the partitioner is an instance of DefaultPartitioner  or UniformStickyPartitioner  and would run partitioning logic instead of calling into partitioner.partition function.

Uniform Sticky Batch Size

ignored if adaptive partitioning is disabled via setting partitioner.adaptive.partitioning.enable to 'false'.

partitioner.ignore.keys.  The default would be 'false', if it's 'false' then the producer uses the message key (if specified) to pick a partition, if it's 'true' the producer doesn't use message keys to pick a partition, even if it's specified.

Note that the new configuration applies to the partitioning algorithm that is used when partitioner.class is not specified.

Proposed Changes

This is the default partitioning logic for messages without keys, that substitutes the corresponding logic implemented in the default partitioner.  The default partitioning logic is going to be implemented in KafkaProducer itself.  KafkaProducer would check if the partitioner.class is set to null and implement the default partitioning logic in that case. If partitioner.ignore.keys is set to 'true', then even messages that have keys would be uniformly distributed among partitions.

Uniform Sticky Batch Size

Instead of switching partitions on every batch creation, switch partitions every time batch.size bytes got produced to partition.  Say we're producing to partition 1.  After 16KB got produced to partition 1, we switch to partition 42.  After 16KB got produced to partition 42, we switch to partition 3.  And so on.  We do it regardless of what happens with batching or etc. just count the bytes produced to a partition.  This way the Instead of switching partitions on every batch creation, switch partitions every time batch.size bytes got produced to partition.  Say we're producing to partition 1.  After 16KB got produced to partition 1, we switch to partition 42.  After 16KB got produced to partition 42, we switch to partition 3.  And so on.  We do it regardless of what happens with batching or etc. just count the bytes produced to a partition.  This way the distribution would be both uniform (there could be small temporary imbalance) and sticky even if linger.ms=0 because more consecutive records are directed to a partition, allowing it to create better batches.

...

This information can be easily collected in the RecordAccumulator object so the KafkaProducer  class would check if DefaultPartitioner  or UniformStickyPartitioner  is used and calculate the partition information in the RecordAccumulator  unless a custom partitioner class is explicitly set.

Adaptive Partition Switching

...

In addition to queue size - based logic, partitionpartitioner.availability.timeout.ms can set to a non-0 value, in which case partitions that have batches ready to be sent for more than partitionpartitioner.availability.timeout.ms milliseconds, would be marked as not available for partitioning and would not be chosen until the broker is able to accept the next ready batch from the partition.

Adaptive partition switching can be turned off by setting enable.adaptive.partitioning = false.

to accept the next ready batch from the partition.

Adaptive partition switching can be turned off by setting partitioner.adaptive.partitioning.enable = false.

From the implementation perspective, the partitioner doesn't know anything about queue sizes or broker readiness (but that information can be gathered in the RecordAccumulator object), so so the KafkaProducer  class would execute partitioning logic in the RecordAccumulator unless a custom partitioner is explicitly set.

Note that these changes do not affect partitioning for keyed messages, only partitioning for unkeyed messages.

Compatibility, Deprecation, and Migration Plan

  • DefaultPartitioner  and UniformStickyPartitioner will be deprecated, they'll behave the same way as they are today.
  • Users that don't specify a custom partitioner would get the new behavior automatically.
  • Users that explicitly specify DefaultPartitioner  or UniformStickyPartitioner would get a deprecation warning, but see no change of behavior.  They would need to update the configuration correspondingly to get the updated behavior (remove partitioner.class setting and optionally set partitioner.ignore.keys to 'true' if replacing UniformStickyPartitioner).
  • Users can continue to use their own partitioners -- if they want to implement a partitioner that switches partitions based on batch creations, they can use the onNewBatch(String topic, Cluster cluster) method to implement the feature.

Rejected Alternatives

KafkaProducer to Explicitly Check for DefaultPartitioner

KafkaProducer could explicitly From the implementation perspective, the partitioner doesn't know anything about queue sizes or broker readiness (but that information can be gathered in the RecordAccumulator object), so so the KafkaProducer  class would check if DefaultPartitioner  or UniformStickyPartitioner  is used  and execute partitioning logic in the RecordAccumulator instead.

Note that these changes do not affect partitioning for keyed messages, only partitioning for unkeyed messages.

Compatibility, Deprecation, and Migration Plan

  • No compatibility, deprecation, migration plan.  This fixes a problem with current implementation.
  • Users can continue to use their own partitioners -- if they want to implement a partitioner that switches partitions based on batch creations, they can use the onNewBatch(String topic, Cluster cluster) method to implement the feature.

...

instead.

This approach was rejected because it was deemed that it would be confusing to have partitioner implementations that are ignored.

Partitioner.partition to return -1

...

This also seems to be more future proof than trying to preserve (partial) encapsulation of partitioning logic within default partitioner, because if in the future we support additional signals, we can just change the logic in the producer and don't need to extend the partitioner interface to pass additional info.

This approach was rejected because it was deemed that this interface change wouldn't benefit other partitioners.

Callbacks in Partitioner.partition

...