Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Sometimes, application's threads will block for max.block.ms to send records using KafkaProducer#send. It exhausted threads of whole system for the time in some cases.

When application try to reduce the max.block.ms to decrease the blocking time. Thus, they will find they couldn't change the value to any one which is smaller than the time costed for metadata's fetch. What's more, metadata's fetch is one heavy operation which cost a lot of time.

Take our project as example. we will take about 4 seconds to complete the metadata's fetch. So, we can't change the max.block.ms to any value < 4000ms.


After analyzing the issue. The root cause is the configured max.block.ms is shared by "metadata fetch" operation and "append record" operation. We can refer to follow table in detail:

where to block


when it is blocked


how long it will be blocked?


org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata

The first request which need to load the metadata from kafka

<max<max.block.ms

org.apache.kafka.clients.producer.internals.RecordAccumulator#append

At peak time for business, if the network can’t send message in short time.

<max<max.block.ms

What's more, the metadata's fetch only need to be done one time in whole blocking of KafkaProducer#send. After the complete of fetch, the metadata will be retrieved from cache directly and its timer update only happen on network thread.

...

producer's configure.

configure item.

default value


includeWaitTimeOnMetadataInMaxBlockTime

max.block.ms.include.metadata

false

maxWaitTimeMsOnMetadata

max.block.metadata.ms

<max<max.block.ms

2. Code changes

By default, includeWaitTimeOnMetadataInMaxBlockTime is true, all of the behaviors are not changed.

When user set includeWaitTimeOnMetadataInMaxBlockTime to false, KafkaProducer#send will block maxWaitTimeMsOnMetadata for metadata's fetch and block max.block.ms for remaining operations.

...

  • What impact (if any) will there be on existing users?  
    no impact on existed users.
  • If we are changing behavior how will we phase out the older behavior?
    no changing older behavior.
  • If we need special migration tools, describe them here.
    no.
  • When will we remove the existing behavior?
    no need to remove.

Test Plan


We can test with test matrix:

if we need N (2<N<5) seconds for metadata's fetch, we will send record to test producer with different configures.

Cases to send record.

max.block.ms

includeWaitTimeOnMetadataInMaxBlockTime(max.block.ms.include.metadata)

maxWaitTimeMsOnMetadata(max.block.metadata.ms)

case 1 success

10 seconds

default value: false (no set)

default value: 60 seconds (no set)

case 2 fail to send

1 seconds

default value: false (no set)

default value: 60 seconds (no set)

case 3 success 

10 seconds

true

default value: 60 seconds (no set)

case 4 success

1 seconds

true

5 seconds

case 5 fail to send

1 seconds

true

1 seconds


Rejected Alternatives

One alternative is that providing new method to complete the metadata fetch not controlled by max.block.ms and user should call it before sending any record. For example, user can call it before marking the service ready.

...