Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Records are sent until either of above 2 thresholds are reached.


The high level component diagram is like this:

Image Added


However, when we set the batch size, we'll run into a dilemma:

...

That explains why we have the dilemma when setting this config.


Use the above example, we set the "batch.size" as default 16KB.

1. off-peak time, the graph might look like this(when linger.ms expired):

Image Added

We can see, the memory usage is very low.

2. peak time, the graph might look like this:

Image Added

We can see, the batch is full, and need to create new batch for it and send out this batch soon.


In the peak-time case, when we notice this situation, we might want to increase the batch size for it, let's say, increase from 16KB to 20KB. But when we increase the batch.size, we know all batches will be allocate with 20KB from now on, even it's off-peak time (check case 1). That's wasteful.


In this KIP, I'm going to introduce a dynamic expandable buffer size for producer.

...

We'll allocate the batch.initial.size memory when new records send to an empty partition. While we accumulated more records in the partitions to reach the "batch.initial.size" (ex: 2KB), we'll do buffer expansion to the "batch.reallocation.factor" size (ex: 2KB * 2 = 4KB), and keeps accumulating the records, until we reach the "batch.size", or the "linger.ms" expired.

In the "BufferPool" class, we used to keep a "free" queue(Deque), to keep the buffers with "batch.size" large, so that we can reduce the cost of de-allocation/re-allocate memory. Now, we can make the "free" as a "Map<Integer, Deque>", to store the "buffer size" → buffers map.

Please note, the buffer expansion is an array copy process (internally we use ByteBuffer), so it's not a free operation. Please also consider the cost of expansion, and set a reasonable "batch.initial.size".


So, let's see the 2 cases above

1. off-peak time

Image Added

We can see now, the memory usage is still high, because we allocate batch.initial.size(4KB) first.

2. peak-time

Image Added

With the batch.initial.size config introduced, we can set the upper bound batch.size higher, because we know we will allocate that many buffer when necessary.


Compatibility, Deprecation, and Migration Plan

...