Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the peak-time case, when we notice this situation, we might want to increase the batch size for it, let's say, increase from 16KB to 20KB. But when we increase the batch.size, we know all batches will be allocate with 20KB from now on, even it's off-peak time (check case 1). That's wasteful.


Furthermore, currently batch.size is used in two conditions:

1. When we append records to a batch in the accumulator, we create a new batch if the current batch would exceed the batch.size.
2. When we drain the batch from the accumulator, a batch becomes 'ready' when it reaches batch.size.

The second condition is good with the current batch size, because if linger.ms is greater than 0, the send can be triggered by accomplishing the
batching goal.

The first condition, though, leads to creating many batches if the network latency or production rate (or both) is high, and with 5 in-flight and 16KB batches we can only have 80KB of data in-flight per partition.  Which means that with 50ms latency, we can only push ~1.6MB/sec per partition (this goes down if we consider higher latencies, e.g. with 100ms we can only push ~0.8MB/sec).


In this KIP, I'm going to introduce a dynamic expandable buffer size for producer, with a larger allowable batch size (i.e. "batch.max.size").

Public Interfaces

2 Producer config will be introduced:

  1. batch.initial.size: It specifies the initial batch size when new batch created (default is 0(disabled), which means we'll always allocate "batch.size" buffer and no buffer expansion will happen)
  2. batch.max.size: It specifies the maximum batch size in bytes per partition, it should be less than "max.request.size". ("batch.max.size" default to 262144(256KB))

To have a better memory usage, the relation of the configurations should be:

  1. "batch.size" = "batch.initial.size" * n (n means the times we expansion)
  2. "batch.max.size" = "batch.initial.size" * n (n means the times we expansion)

ex:

1. "batch.size" = 16KB, "batch.initial.size" = 2KB

=> 16KB = 2KB * 8

2. "batch.size" = 16KB, "batch.initial.size" = 0 (default, disabled), "batch.max.size" = 256KB

=> 256KB = 16KB * 16

Proposed Changes

We'll allocate the "batch.initial.size" (default disabled, which will use "batch.size" as initial batch size) memory when new records send to an empty partition buffer. While we accumulated more records in the partitions to reach the "batch.initial.size" (ex: 2KB), we'll do buffer expansion to allocate another "batch.initial.size" of buffer and list with the previous buffer. And then, keeps accumulating the records.

When reaching the "batch.size" of the buffer, it means the buffer is "ready" to be be sent. But before the sender thread is ready to send the batch, if there are more data coming, until we reach we can still accumulate it into the same buffer, until it reached the "batch.max.size", or . After it reached the "lingerbatch.max.ms" expiredsize", we'll create another batch for it.


So, let's see the 2 cases above

1. off-peak time

Image RemovedImage Added


We can see now, the memory usage is still high, because we allocate batch.initial.size(4KB) firsteach chunk.

2. peak-time

Image RemovedImage Added

With the batch.initialmax.size config introduced, we can set the upper bound batch.size higher, because we know we will allocate that many buffer when necessaryhave a larger amount of batch before the sender thread is ready to send the batch, to achieve high throughput.


Compatibility, Deprecation, and Migration Plan

Because the "batch.initial.size" default value is 0(disabled), which means we'll always allocate "batch.size" buffer and no buffer expansion will happen, there will be always backward compatible. No migration plan is needed.

Rejected Alternatives

I think we should also consider tweaking the semantics of batch.size so that the sent batches can be larger if the batch is not ready to be sent
(while still respecting max.request.size and perhaps a new max.batch.size).

And the "batch.max.size" is transient to users. So no migration plan is needed.

Rejected Alternatives

N/A--> In the KIP, I was trying to make the "batch.size" as the upper bound of the batch size, and introduce a "batch.initial.size" as initial batch size.
So are you saying that we can let "batch.size" as initial batch size and introduce a "max.batch.size" as upper bound value?
That's a good suggestion, but that would change the semantics of "batch.size", which might surprise some users. I think my original proposal ("batch.initial.size") is safer for users.