You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka provides high throughput in each component. In the producer, there are 2 key configs to support the high throughput:

  1. batch.size: it specifies a maximum batch size in bytes per partition (default 16384)
  2. linger.ms: it specifies a maximum duration to fill the batch in milliseconds (default 0 or no delay)

Records are sent until either of above 2 thresholds are reached.


However, when we set the batch size, we'll run into a dilemma:

  1. either we have higher throughput + more memory waste,
  2. or we have slower throughput + less memory waste.

Why is that?

We can check the description for "batch.size" in documentation here. Here's the last paragraph I extracted from the document:

A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.


That explains why we have the dilemma when setting this config.


In this KIP, I'm going to introduce a dynamic expandable buffer size for producer.

Public Interfaces

2 Producer config will be introduced:

  1. batch.initial.size: It specifies the initial batch size when new batch created (default is 0(disabled), which means we'll always allocate "batch.size" buffer and no buffer expansion will happen)
  2. batch.reallocation.factor: it specifies the factor when we try to reallocate the new buffer (default is 2)

To have a better memory usage, the relation of the configurations is recommended to be: "batch.size" = "batch.initial.size" * "batch.reallocation.factor"^n (n means the times we expansion)

ex: "batch.size" = 16KB, "batch.initial.size" = 2KB, "batch.reallocation.factor" = 2

16KB = 2KB * 2^3

Proposed Changes

We'll allocate the batch.initial.size memory when new records send to an empty partition. While we accumulated more records in the partitions to reach the "batch.initial.size" (ex: 2KB), we'll do buffer expansion to the "batch.reallocation.factor" size (ex: 2KB * 2 = 4KB), and keeps accumulating the records, until we reach the "batch.size", or the "linger.ms" expired.

Please note, the buffer expansion is an array copy process (internally we use ByteBuffer), so it's not a free operation. Please also consider the cost of expansion, and set a reasonable "batch.initial.size".



Compatibility, Deprecation, and Migration Plan

Because the "batch.initial.size" default value is 0(disabled), which means we'll always allocate "batch.size" buffer and no buffer expansion will happen, there will be always backward compatible. No migration plan is needed.

Rejected Alternatives

n/a

  • No labels