Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: AcceptedUnder Discussion

Discussion thread: here

JIRA: KAFKA-1660, KAFKA-1659

Released: 0.8.3

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This call waits some specified time for the producer to close. If the producer did not finish closing before timeout, it close the producer forcefully by failing all the unsent messages.

  • close(-10, TimeUnit.MILLISECONDS) - do not wait, fail all the incomplete requests and close the producer.close(0, TimeUnit.MILLISECONDS) - wait until all the messages are either sent successfully or failed
  • close(10000, TimeUnit.MILLISECONDS) - wait at most 10 seconds for the producer to send pending messages, if messages cannot be sent in 10 seconds, let the sender thread fail the rest of messages, wait for sender thread to complete and close producer.
  • close(-1, TimeUnit.MILLISECONDS) - IllegalArgumentException will be thrown.

Several addition considerations:

  • The close(timeout) should work when called from a user caller thread or the internal sender thread.
  • In sender thread only close(-10, TimeUnit.MILLISECONDS) should be called because:
    • close() or close(0, TimeUnit.MILLISECONDS) should not be called because that will make producer block forever.
    • close(500, TimeUnit.MILLISECONDS) has the same effect as close(-10, TimeUnit.MILLISECONDS) only except it will block for 500 milliseconds.
  • IMPORTANT, for the reason above, if a close() or close(timeout, TimeUnit) is called from sender thread (i.e. callback). An error message will be logged and close(0, TimeUnit.MILLISECONDS) will be called instead.
  • Because it is possible the close(timeout) is called for multiple times, it has to be idempotent.

...

When close(positive, TimeUnit.MILLISECONDS) is called, it will try to do a normal close first. If the normal close did not finish before timeout, it then close the producer forcefully. It will also wait the sender thread to finish.

If user calls close() in callback, it will block forever because the sender thread will try to join itself. So if a close() call is called from sender thread, an error message will be put in the log and close(0) will be called instead.

Compatibility, Deprecation, and Migration Plan

...

In addition if producer.close() were to return without actually closing the producer in time, the user could continue with other logic besides producer.abort(). "

We In KAFKA-1659, we decided to merge the abort() into producer.close(timeout) to avoid adding another interface. It is also agreed in KAFKA-1934.