Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread:   here

JIRA: KAFKA-1660

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Public Interfaces

Add another interface:

public void KafkaProducer. close(long timeout, TimeUnit timeUnit).

This call waits some specified time for the producer to close. If the producer did not finish closing before timeout, it close the producer forcefully by failing all the unsent messages.

  • close(-1, TimeUnit.MILLISECONDS) - do not wait, fail all the incomplete requests and close the producer.
  • close(0, TimeUnit.MILLISECONDS) - wait until all the messages are either sent successfully or failed
  • close(10000, TimeUnit.MILLISECONDS) - wait at most 10 seconds for the producer to send pending messages, if messages cannot be sent in 10 seconds, fail the rest of messages and close producer.

Several addition considerations:

  • The close(timeout) should work when called from a user caller thread or the internal sender thread.

When timeout is negative, it fails all unsent messages and close the producer without waiting.

...

  • In sender thread only close(-1, TimeUnit.MILLISECONDS) should be called because:
    • close() or close(0, TimeUnitMILLISECONDS) should not be called because that will make producer block forever.
    • close(500, TimeUnit.MILLISECONDS) has the same effect as close(-1, TimeUnit.MILLISECONDS) only except it will block for 500 milliseconds.
  • Because it is possible the close(timeout) is called for multiple times, it has to be idempotent.

Proposed Changes

  1.  Add a forceClose flag to sender.
  2. When forceClose flag is set, sender will not send any more messages but fail all the messages in RecordAccumulator and wake up the threads waiting on a callback. These threads may be doing:
    1. synchronized send
    2. flush()
  3. Cleanup metrics and release other resources.

...