
Current state: Under Discussion

Discussion thread: here


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Current KafkaProducer.close() method will try to finish sending all pending messages before it returns. There are several motivations to add a close method with timeout in the producer.

  1. Sometimes, user will want to close a producer within a bounded time to avoid blocking on producer.close() for too long.
  2. One specific use case of 1) is that in some scenarios, user will want to close the produce immediately and fail all the unsent messages in RecordAccumulator. Some examples are:
    1. In mirror maker, if a send failed, we don't want to continue sending messages in RecordAccumulator to avoid reordering.
    2. For people who are using deployment tools, a service is expected to stop in given time. In that case people might want to have a bounded time to shutdown producer.

So we need to provide an interface that allow user to choose to close producer in which way.

Public Interfaces

Add another interface:

public void close(long timeout, TimeUnit timeUnit)

This call waits some specified time for the producer to close. If the producer did not finish closing before timeout, it close the producer forcefully by failing all the unsent messages.

Several addition considerations:

Proposed Changes

  1.  Add a forceClose flag to sender.
  2. When forceClose flag is set, sender will not send any more messages but fail all the messages in RecordAccumulator and wake up the threads waiting on a callback. These threads may be doing:
    1. synchronized send
    2. flush()
  3. Cleanup metrics and release other resources.

Compatibility, Deprecation, and Migration Plan

This is a backward compatible change.

Rejected Alternatives
