You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread

JIRA: KAFKA-1865

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently there is no way to force the sending of all buffered messages in the new Java producer.

Currently if you want to send a batch of messages and don't care about the error or offset you can do that like this:

Current Usage
List<ProducerMetadata> results = new ArrayList<ProducerMetadata>();
for(String messageToSend: batch) {
    ProducerMetadata result = producer.send(new ProducerRecord("my-topic", messageToSend));
    results.add(result);
}
for(ProducerMetadata result: results)
    result.get();

There are two problems with this usage, first it is sort of annoying to iterate through all the futures to wait until they are all sent.

Second, if you set linger.ms > 0 to encourage batching of messages, which is likely a good idea for this kind of use case, then the second for loop will block for a ms as we will not immediately send the records but wait for more to arrive. Since your code isn't sending any more this waiting is kind of silly.

Public Interfaces

The proposal is to add a new method to the Producer interface:

Flush method
    /**
     * Immediately flush any records currently buffered in the producer. This will cause any unsent records to immediately
     * be available to send regardless of the configured linger.ms setting. The method will block until all previously
     * sent records have completed sending (either successfully or with an error).
     */
    public void flush();

One nuance of this interface is what does it mean when flush is called in a multithreaded use case? Does it block sends in other threads? Is it possible for two threads to call flush at the same time, and if so, what does that mean?

Here is the proposed multi-threaded semantics: flush doesn't block additional send calls from other threads. The semantics will be that any send from any thread that completes before flush initiates will be completed when the flush call completes (flush flushes all buffered messages, not just the ones sent by the current thread). Other threads can initiate sends while flush is in progress and these sends won't be blocked and may well be included in the requests resulting from the flush however there is no guarantee one way or another.

The flush call itself will hold a lock so that only one flush call can be invoked at any given time (this is for implementation simplicity as it is tricky to have multiple flush calls in progress at a given time and track the point they each guarantee.

Proposed Changes

The full description is pretty much covered in the api description. There is a first pass on implementation here.

Compatibility, Deprecation, and Migration Plan

This is a new producer API and changes the Producer.java interface. We have said that that interface was really for our usage and we make no promise to other implementors of it avoid changes.

No existing users of KafkaProducer should be broken, though.

Rejected Alternatives

The primary alternative I considered was trying to support multiple flushes at the same time. The challenge there is really just the implementation. We can always do it later if we figure out a good way.

  • No labels