Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will change the behavior of producer.send() so that it does not block on metadata update. If metadata is not available, producer.send() will enqueue the request into the producer's buffer and return a future immediately to user. If the queue is already full, producer.send() will still block up to max.block.ms as does the existing Kafka producer's implementation.

 

Proposed change

 

Before the proposed change of this KIP:

The Currently the following will happen in the user thread after producer.send() is called by the user thread:

Steps:

1) Get partition information of the give topic to determine the partition for the given message. Wait up to max.block.ms for cluster metadata update if metadata is not available for the give topic.
2) Serialize key and value of the given message.
3) Determine the partition of the message given the key of the message.
4) Allocate memory and put the message into a per-partition queueMaybe allocate memory to create a new batch for the corresponding partition of the message. Wait up to max.block.ms if there is not enough free memory yet.
4) Compress and put the message into the batch.
5) proucer.send() will unblock and return a future.

Thread model:

Asynchronously, producer's IO thread will keep draining messages from each per-partition queue and send messages to brokers.

 


After the proposed change of this KIP, the :

The following will happen after happen in the user thread after producer.send() is called by the user thread:Steps: 

1) Serialize key and value of the given message.

2.1) Allocate memory and put If the partition information of the topic is not available:
    - Allocate memory to store the given uncompressed message. Wait up to max.block.ms if there is not enough free memory yet.
    - Put the message into a per-topic queue.

2.2) If the partition information of the topic is available:
    - Maybe allocate memory to create a new batch for the corresponding partition of the message. Wait up to max.block.ms if there is not enough free memory yet.
    - Compress and put the message into the batch.

3) For every message in every non-empty per-topic queue whose topic's partition information is available,
    - Maybe allocate memory to create a new batch for the corresponding partition of the message. Wait up to max.block.ms if there is not enough free memory yet.
    - Compress and put the message into the batch.
    - Dealloate the buffer for the message

4) proucer.send() will unblock and return a future.

Thread model: 

Asynchronously, producer's IO thread will drain messageds from the per-topic queue into the existing In the rare scenario where user thread sends a message when the metadata is not available, but never calls producer.send() after metadata is available, the message will never be put into the per-partition queue after the metadata for the given topic is available. The IO thread will still perform the existing task of draining messages from each per-partition queue and send messages to brokerswith the changes above. In order to take care of this scenario, this KIP should additional modify the producer's IO thread such that, if no new ProduceRequest can be generated and there exists non-empty per-topic queue whose topic's partition information is available, IO thread should do the step 3) above to move these messages into the per-partition queue.

 

Compatibility, Deprecation, and Migration Plan

...