Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here  

JIRAKafka-14768

Motivation

Sometimes,

...

When application try to reduce the max.block.ms to decrease the blocking time. They will find they couldn't change the value to any one which is smaller than the time costed for metadata's fetch. What's more, metadata's fetch is one heavy operation which cost a lot of time.

Take our project as example. we will take about 4 seconds to complete the metadata's fetch. So, we can't change the max.block.ms to any value < 4000ms.

After analyzing the issue. The root cause is the configured max.block.ms is shared by "metadata fetch" operation and "append record" operation. We can refer to follow table in detail:

where to block

when it is blocked

how long it will be blocked?

...

org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata

...

The first request which need to load the metadata from kafka

...

<max.block.ms

...

org.apache.kafka.clients.producer.internals.RecordAccumulator#append

...

At peak time for business, if the network can’t send message in short time.

...

<max.block.ms

we found the users' functional interaction take a lot of time. At last, we figure out the root cause is that after we complete deploy or restart the servers.

The first message's delivery on each application server by kafka client will take much time.

After analyzing the source code about the first time's sending logic. The time cost is caused by the getting metadata before the sending. The latter's sending won't take the much time due to the cached metadata. The metadata's fetching logic is right and necessary. Thus, we still want to improve the experience for the first message's send/user first interaction.

So, This KIP try to raise one solution to improve it.

The solution is that we can provide one method for metadata fetch into Producer.  When the application restarted/started, it can call it before mark the application is ready for handle requests. So, when the first request/record be handled. the metadata had been fetched so that it's handle's speed will be much faster.

Code Block
public Cluster getCluster(String topic, long maxBlockTimeMs) {
        Objects.requireNonNull(topic, "topic cannot be null");
        try {
            return waitOnMetadata(topic, null, time.milliseconds(), maxBlockTimeMs).cluster;
        } catch (InterruptedException e) {
            throw new InterruptException(e);
}

note: waitOnMetadata(topic, null, time.milliseconds(), maxBlockTimeMs) is the existed method with provide modifier.

Public Interfaces

add new interface with tiny refactor which reduce the duplicated code.

Cluster getCluster(String topic, long maxBlockTimeMs);

What's more, the metadata's fetch only need to be done one time in KafkaProducer#send. After the complete of first fetch, the metadata will be retrieved from cache directly and its timer update only happen on network thread instead of user's thread.

So, this KIP try to reach the goal we can reduce the blocking time by changing the max.block.ms to wanted smaller value without worrying about the metadata's fetch. 

Public Interfaces

No public interface changed. Just change the inner implement of private method:

org.apache.kafka.clients.producer.KafkaProducer#doSend

Add two new configure items for producer.

Proposed Changes

The changes can refer to the example PR:   https://github.com/apache/kafka/pull/1333513320/files

Add two configures with tiny code changes related which control the timeout in KafkaProducer#send

1. Two configures added

...

Producer's configure.

...

configure item.

default value

...

includeWaitTimeOnMetadataInMaxBlockTime

...

max.block.ms.include.metadata

...

false

...

maxWaitTimeMsOnMetadata

...

max.block.metadata.ms

...

The core code had been listed in Motivation part.

2. Code changes

By default, includeWaitTimeOnMetadataInMaxBlockTime is true, all of the behaviors are not changed.

When user set includeWaitTimeOnMetadataInMaxBlockTime to false, KafkaProducer#send will block maxWaitTimeMsOnMetadata for metadata's fetch and block max.block.ms for remaining operations.

Compatibility, Deprecation, and Migration Plan

If user want to use the feature, user can upgrade the client with can call the new configures setadded method.

If user don't have requirement for it, there isn't any need to do any change.

What's more, new client version's upgrade also won't influence existed behavior.

  • What impact (if any) will there be on existing users?  
    no impact on existed users.
  • If we are changing behavior how will we phase out the older behavior?
    no changing older behavior.
  • If we need special migration tools, describe them here.
    no.
  • When will we remove the existing behavior?
    no need to remove.

Test Plan

We can test with test matrix:

if we need N (2<N<5) seconds for metadata's fetch, we will send record to test producer with different configures.

...

Cases to send record.\Configures

...

max.block.ms

...

includeWaitTimeOnMetadataInMaxBlockTime(max.block.ms.include.metadata)

...

maxWaitTimeMsOnMetadata(max.block.metadata.ms)

...

1

...

10 seconds

...

default value: false  

...

default value: 60 seconds 

...

2

...

1 seconds

...

default value: false  

...

default value: 60 seconds 

...

3

...

10 seconds

...

true

...

default value: 60 seconds 

...

4

...

1 seconds

...

true

...

5 seconds

...

5

...

1 seconds

...

true

...

1 seconds

Compare the first record's send time cost to see if any improvement happens.

Rejected Alternatives

maybe we can provide one dedicated method with more naming instead of "getCluster".

 

Case 2 and case 5 will fail to send records. All of others are success.

Rejected Alternatives

Alternative 1: