Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Furthermore, currently batch.size is used in two conditions:

1. When we append records to a batch in the accumulator, we create a new batch if the current batch would exceed the batch.size.
2. When we drain the batch from the accumulator, a batch becomes 'ready' when it reaches batch.size.

The second condition is good with the current batch size, because if linger.ms is greater than 0, the send can be triggered by accomplishing the
batching goal.

The first condition, though, leads to creating many batches if the network latency or production rate (or both) is high, and with 5 in-flight and 16KB batches we can only have 80KB of data in-flight per partition.  Which means that with 50ms latency, we can only push ~1.6MB/sec per partition (this goes down if we consider higher latencies, e.g. with 100ms we can only push ~0.8MB/sec).

...

In this KIP, I'm going to introduce a dynamic expandable buffer size for producer, with a larger allowable batch size (i.e. "batch.max.size").

Goal:

  1. higher throughput: with the "batch.max.size" introduced, the batch can allow records > "batch.size" (and < "batch.max.size"), and still can be sent within one batch. So, even a sudden high producer rate, the throughput can still be good.
  2. better memory usage: with "batch.initial.size" introduced, the initial memory allocation will be small. So when setting the "batch.size", you don't have to consider the memory waste issue now.

Public Interfaces

2 Producer config will be introduced:

...