Status
Current state: Accepted
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently producer.send() may block up to max.block.ms if metadata is not available. In general the following may happen if metadata is not available:
- producer.send() does block (if max.block.ms is 0) and user will drop message. However, this probably does not make sense to drop message if producer still has memory to buffer data (as configured by buffer.memory). In particular, when the producer is just instantiated, it is very likely that the metadata is not available when the producer.send() called for the first time, and we probably don't want to always drop the first few messages.
- producer.send() blocks user thread which can cause problem for latency sensitive front-end application.
- producer.send() blocks but user uses an extra thread and buffer to make sure that the user-thread does not block. This approach can work but requires non-trivial implementation on the user side to keep track of the the extra buffer to make sure it does not cause OOM etc.
Thus it appears that the existing behavior for producer.send() is not ideal if metadata is not available. This KIP proposes to improve the user experience and let producer.send() not block on metadata update.
Public Interfaces
We will change the behavior of producer.send() so that it does not block on metadata update. If metadata is not available, producer.send() will enqueue the request into the producer's buffer and return a future immediately to user. If the queue is already full, producer.send() will still block up to max.block.ms as does the existing Kafka producer's implementation.
Proposed change
Currently the following will happen after producer.send() is called by the user thread:
Steps:
1) Get partition information of the give topic. Wait up to max.block.ms for cluster metadata update if metadata is not available for the give topic.
2) Serialize key and value of the given message.
3) Determine the partition of the message given the key of the message.
4) Allocate memory and put the message into a per-partition queue. Wait up to max.block.ms if there is not enough free memory yet.
5) proucer.send() will unblock and return a future.
Thread model:
Asynchronously, producer's IO thread will keep draining messages from each per-partition queue and send messages to brokers.
After the proposed change of this KIP, the following will happen after producer.send() is called by the user thread:
Steps:
1) Serialize key and value of the given message.
2) Allocate memory and put the message into a per-topic queue. Wait up to max.block.ms if there is not enough free memory yet.
3) proucer.send() will unblock and return a future.
Thread model:
Asynchronously, producer's IO thread will drain messageds from the per-topic queue into the existing per-partition queue after the metadata for the given topic is available. The IO thread will still perform the existing task of draining messages from each per-partition queue and send messages to brokers.
Compatibility, Deprecation, and Migration Plan
Rejected alternative
- Make producer.send() non-blocking even if the queue is full.
Some user may still want to wait for a configurable amount of time on producer.send() if the queue is full instead of dropping messages immedidately. Users who want complete non-blocking producer.send() can set max.block.ms to 0.