Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Sometimes, application's threads will block for max.block.ms to send records using KafkaProducer#send. It exhausted threads of whole system for the time in some cases.

When application try to reduce the max.block.ms to decrease the blocking time. Thus, they will find they couldn't change the value to any one which is smaller than the time costed for metadata's fetch. What's more, metadata's fetch is one heavy operation which cost a lot of time.

...

So, this KIP try to reach the goal we can change the max.block.ms to wanted smaller value without worry about the metadata's fetch.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

...

Binary log format

...

The network protocol and api behavior

...

Any class in the public packages under clientsConfiguration, especially client configuration

  • org/apache/kafka/common/serialization

  • org/apache/kafka/common

  • org/apache/kafka/common/errors

  • org/apache/kafka/clients/producer

  • org/apache/kafka/clients/consumer (eventually, once stable)

...

Monitoring

...

Command line tools and arguments

No public interface changed. Just change the inner implement of private method:

org.apache.kafka.clients.producer.KafkaProducer#doSend

Add two new configure items for producer.

...

Proposed Changes

The changes can refer to the example PR:  https://github.com/apache/kafka/pull/13335/files

...

1. Two configures added

producer's configure.

configure item.

default value


includeWaitTimeOnMetadataInMaxBlockTime

max.block.ms.include.metadata

false

maxWaitTimeMsOnMetadata

max.block.metadata.ms

<max.block.ms

...

If user don't have requirement for it, there isn't any need to do any change. What's more, new client version's upgrade also won't influcence influence existed behavior.

  • What impact (if any) will there be on existing users?  
    no impact on existed users.
  • If we are changing behavior how will we phase out the older behavior?
    no changing older behavior.
  • If we need special migration tools, describe them here.
    no.
  • When will we remove the existing behavior?
    no need to remove.

Test Plan

...


We can test with test matrix:

if we need N (2<N<5) seconds for metadata's fetch: 

Cases to send record.

max.block.ms

includeWaitTimeOnMetadataInMaxBlockTime


maxWaitTimeMsOnMetadata

case 1 success

10 seconds

default value: false (no set)

default value: 60 seconds (no set)

case 2 fail to send

1 seconds

default value: false (no set)

default value: 60 seconds (no set)

case 3 success 

10 seconds

true

default value: 60 seconds (no set)

case 4 success

1 seconds

true

5 seconds

case 5 fail to send

1 seconds

true

1 seconds


Rejected Alternatives

One alternative is that providing new method to complete the metadata fetch not controlled by max.block.ms and user should call it before sending any record. For example, user can call it before marking the service ready.

...

Another possible alternative is https://cwiki.apache.org/confluence/display/KAFKA/KIP-286. The KIP's goal is that "We will change the behavior of producer.send() so that it does not block on metadata update". I think the metadata's blocking is still needed. the thing we can do is to move the blocking before producer.send(). Thus, the KIP don't solve the issue from this point.