Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

After analyzing the issue. The root cause is the configured max.block.ms is shared by "metadata fetch" operation and "append record" operation. We can refer to follow table in detail:

where to block


when it is blocked


how long it will be blocked?


org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata

The first request which need to load the metadata from kafka

<max.block.ms

org.apache.kafka.clients.producer.internals.RecordAccumulator#append

At peak time for business, if the network can’t send message in short time.

<max.block.ms

What's more, the metadata's fetch only need to be done one time in whole blocking of KafkaProducer#send. After the complete of fetch, the metadata will be retrieved from cache directly and its timer update only happen on network thread.

So, this KIP try to reach the goal we can change the max.block.ms to wanted smaller value without worry about the metadata's fetch.

...

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

...

The changes can refer to the example PR:  

Add two configures with tiny code changes related which control the timeout in KafkaProducer#send

1. Two configures added

producer's configure

configure item

default value


includeWaitTimeOnMetadataInMaxBlockTime

max.block.ms.include.metadata

false

maxWaitTimeMsOnMetadata

max.block.metadata.ms

<max.block.ms

2. Code changes

By default, includeWaitTimeOnMetadataInMaxBlockTime is true, all of the behaviors are not changed.

When user set includeWaitTimeOnMetadataInMaxBlockTime to false, KafkaProducer#send will block maxWaitTimeMsOnMetadata for metadata's fetch and block max.block.ms for remaining operations.

Compatibility, Deprecation, and Migration Plan

If user want to use the feature, user can upgrade the client with the new configures set.

If user don't have requirement for it, there isn't any need to do any change. What's more, new version's upgrade also won't influcence existed behavior.

  • What impact (if any) will there be on existing users?  no impact on existed users.
  • If we are changing behavior how will we phase out the older behavior?
    no changing older behavior.
  • If we need special migration tools, describe them here.no.
  • When will we remove the existing behavior?no need to remove.

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

One alternative is that providing new method to complete the metadata fetch not controlled by max.block.ms and user should call it before sending any record. For example, user can call it before marking the service ready.

The alternatives can solve the issue and also solve the first record's slow latency issue. but if user only have interesting to reduce the blocking time without care about it. It isn't the best solution for this requirement due to user may forget it to call it before any sending or don't aware to call another method to solve the issue.


Another possible alternative is https://cwiki.apache.org/confluence/display/KAFKA/KIP-286. The KIP's goal is that "We will change the behavior of producer.send() so that it does not block on metadata update". I think the metadata's blocking is still needed. the thing we can do is to move the blocking before producer.send(). Thus the KIP don't solve the issue from this point.