Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In some scenarios, user will want to close the produce and purge all the unsent messages both in RecordAccumulator. (e.g. to preserve order if a send failed). Current KafkaProducer.close() method will try to send those messages. So we need to provide an interface that allow user to choose to close producer in which way.

Public Interfaces

Add another interface:

public void KafkaProducer.close(long timeout, TimeUnit timeUnit).

...

When timeout is 0, it purge all unsent messages and close the producer without waiting.

Proposed Changes

  1.  Add a forceClose flag to sender.
  2. When forceClose flag is set, sender will not send any more messages but purge the messages in RecordAccumulator and wake up the threads waiting on a callback:
    1. synchronized send
    2. flush()
  3. Cleanup metrics and release other resources.

Compatibility, Deprecation, and Migration Plan

This is a backward compatible change.

Rejected Alternatives

None