Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. It's uniform, which is simple to implement and easy to understand.  Intuitively, this is what users expect.

  2. It creates better batches, without adding linger latency on low production rate but switching to better batching on high production rate.

  3. It adapts to higher latency brokers, using larger batches to push data, keeping throughput and data distribution uniform.

  4. It's efficient (logic for selecting partitions doesn't require complex calculations).

Adaptive Partition Switching

One potential disadvantage of strictly uniform partition switching is that if one of the brokers is lagging behind (cannot sustain its share of throughput), the records will keep piling in the accumulator, and will eventually exhaust the buffer pool memory and slow down the production rate to match the capacity of the slowest broker.  To avoid this problem, the partition switch decision can adapt to broker load.

The queue size of batches waiting to be sent is a direct indication of broker load (more loaded brokers would have longer queue).  Partition switching taking into account the queue sizes when choosing next partition.  The probability of choosing a partition is proportional to the inverse of queue size (i.e. partitions with longer queues are less likely to be chosen).

In addition to queue size - based logic, partition.availability.timeout.ms can set to a non-0 value, in which case partitions that have batches ready to be sent for more than partition.availability.timeout.ms milliseconds, would be marked as not available for partitioning and would not be chosen until the broker is able to accept the next ready batch from the partition.

From the implementation perspective, the partitioner doesn't have enough information to calculate the number of bytes in the record, namely:

  • Headers
  • Compression info
  • Batch headers info (one partition could reside on a faster broker and get more smaller batches, we need to account for that to achieve uniformity)
  • Record overhead (due do var int optimization). Not sure if it matters, but seems to be easier to implement than to prove that it doesn't.

This information can be easily collected in the RecordAccumulator object so we'd either need to call back there when computing the number of bytes for the record (rejected approach) or hoist the partitioning logic into the producer / record accumulator (proposed here via returning -1).

Adaptive Partition Switching

One potential disadvantage of strictly uniform partition switching is that if one of the brokers is lagging behind (cannot sustain its share of throughput), the records will keep piling in the accumulator, and will eventually exhaust the buffer pool memory and slow down the production rate to match the capacity of the slowest broker.  To avoid this problem, the partition switch decision can adapt to broker load.

The queue size of batches waiting to be sent is a direct indication of broker load (more loaded brokers would have longer queue).  Partition switching taking into account the queue sizes when choosing next partition.  The probability of choosing a partition is proportional to the inverse of queue size (i.e. partitions with longer queues are less likely to be chosen).

In addition to queue size - based logic, partition.availability.timeout.ms can set to a non-0 value, in which case partitions that have batches ready to be sent for more than partition.availability.timeout.ms milliseconds, would be marked as not available for partitioning and would not be chosen until the broker is able to accept the next ready batch from the partition.

Adaptive partition switching can be turned off by setting enable.adaptive.partitioning = false.

From the implementation perspective, the partitioner doesn't know anything about queue sizes or broker readiness (but that information can be gathered in the RecordAccumulator object), so we'd either need to call back to the producer when computing the number of bytes for the record (rejected approach) or hoist the partitioning logic into the producer / record accumulator (proposed here via returning -1)Adaptive partition switching can be turned off by setting enable.adaptive.partitioning = false.

Note that these changes do not affect partitioning for keyed messages, only partitioning for unkeyed messages.

...