Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. batch.initial.size: It specifies the initial batch size when new batch created (default is 0(disabled), which means we'll always allocate "batch.size" buffer and no buffer expansion will happen)
  2. batch.reallocation.factor: it specifies the factor when we try to reallocate the new buffer (default is 2)

To have a better memory usage, the relation of the configurations is recommended to should be: "batch.size" = "batch.initial.size" * "batch.reallocation.factor"^n n (n means the times we expansion)

ex: "batch.size" = 16KB, "batch.initial.size" = 2KB, "batch.reallocation.factor" = 2

16KB = 2KB * 2^38

Proposed Changes

We'll allocate the batch.initial.size memory when new records send to an empty partition buffer. While we accumulated more records in the partitions to reach the "batch.initial.size" (ex: 2KB), we'll do buffer expansion to the allocate another "batch.reallocationinitial.factor" size (ex: 2KB * 2 = 4KB), and size" of buffer and list with the previous buffer. And then, keeps accumulating the records, until we reach the "batch.size", or the "linger.ms" expired.In the "BufferPool" class, we used to keep a "free" queue(Deque), to keep the buffers with "batch.size" large, so that we can reduce the cost of de-allocation/re-allocate memory. Now, we can make the "free" as a "Map<Integer, Deque>", to store the "buffer size" → buffers map.

Please note, the buffer expansion is an array copy process (internally we use ByteBuffer), so it's not a free operation. Please also consider the cost of expansion, and set a reasonable "batch.initial.size".


So, let's see the 2 cases above

1. off-peak time

Image RemovedImage Added


We can see now, the memory usage is still high, because we allocate batch.initial.size(4KB) first.

2. peak-time

Image RemovedImage Added

With the batch.initial.size config introduced, we can set the upper bound batch.size higher, because we know we will allocate that many buffer when necessary.

...

Because the "batch.initial.size" default value is 0(disabled), which means we'll always allocate "batch.size" buffer and no buffer expansion will happen, there will be always backward compatible. No migration plan is needed.

Rejected Alternatives

I think we should also consider tweaking the semantics of batch.size so that the sent batches can be larger if the batch is not ready to be sent
(while still respecting max.request.size and perhaps a new max.batch.size).

--> In the KIP, I was trying to make the "batch.size" as the upper bound of the batch size, and introduce a "batch.initial.size" as initial batch size.
So are you saying that we can let "batch.size" as initial batch size and introduce a "max.batch.size" as upper bound value?
n/aThat's a good suggestion, but that would change the semantics of "batch.size", which might surprise some users. I think my original proposal ("batch.initial.size") is safer for users.