Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Records are sent until either of above 2 thresholds are reached.


The high level producer component diagram is like this:

...

2. peak time, the graph might look like this:

We can see, the tp0 batch is full, and need to create new batch for it and send out this batch soon.

...

  1. higher throughput: with the "batch.max.size" introduced, the batch can allow records >  ("batch.size" (and < records <  "batch.max.size") , and still can to be sent within one batch. So, even a sudden high producer rate, the throughput can still be good.
  2. better memory usage: with "batch.initial.size" introduced, the initial memory allocation will be small. So when setting the "batch.size", you don't have to consider the memory waste issue nowanymore.

Public Interfaces

2 Producer config will be introduced:

  1. batch.initial.size: It specifies the initial batch size when new batch created (default is 0(disabled), which means we'll always allocate , it should be <= "batch.size" buffer and no buffer expansion will happen(default is 4096(4KB))
  2. batch.max.size: It specifies the maximum batch size in bytes per partition, it should be less than <= "max.request.size" and >= "batch.size". ("batch.max.size" default to 262144(256KB))

...

  1. "batch.size" = "batch.initial.size" * n (n means the times we expansion)
  2. "batch.max.size" = "batch.initial.size" * n m (n m means the times we expansion)

...

1. "batch.size" = 16KB, "batch.initial.size" = 2KB4KB

=> 16KB = 2KB * 84

2. "batch.size" = 16KB, "batch.initial.size" = 0 (default, disabled), 4KB, "batch.max.size" = 256KB

=> 256KB = 16KB 4KB * 1664

Proposed Changes

We'll allocate the "batch.initial.size" (default disabled, which will use "batch.size" as initial batch size4KB) memory when new records send to an empty partition buffer. While we accumulated more records in the partitions to reach the "batch.initial.size" (ex: 2KB4KB), we'll do buffer expansion to allocate another "batch.initial.size" of buffer and list with the previous buffer. And then, keeps accumulating the records.

When As current producer, when the batch reaching the "batch.size" of the buffer, it means the buffer is "ready" to be be sent. But now, before the sender thread is ready to send the batch, if there are more data coming, we can still accumulate it into the same buffer, until it reached the "batch.max.size" or sender thread send this batch out. After it reached the "batch.max.size", we'll create another batch for it. Compared with current producer, when reaching the "batch.size", we'll mark it as "ready" to be sent, and create a new batch for the upcoming records.


Internally, we have a BufferPool#free to keep the unused allocated buffers. When allocating a buffer, we'll first check if there are available buffer there before allocating a new one. With the expandable batch introduced, there are chances that sudden high producer rate, there are a lot of buffers being allocated, and then send back to BufferPool unused. Some of the buffers in BufferPool might not be used for a long time. In this KIP, when sending the allocated batches back to BufferPool#free after batch sent, we'll only keep maximum "batch.size" into pool, and mark the rest of memory as free to use. The reason we keep maximum "batch.size" back to pool is because the semantic of "batch.size" is the batch full limit. In most cases, the batch.size should be able to contain the records to be sent within linger.ms time.


So, let's see the 2 cases above

...

Compatibility, Deprecation, and Migration Plan

Because the "batch.initial.size" default value is 0(disabled), which means we'll always allocate "batch.size" buffer and no buffer expansion will happen, there 2 new config for batch size introduced in this KIP. There will be always backward compatible. And the "batch.max.size" is transient to users. So no migration plan is needed.

...