Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Accepted Under Discussion

Discussion threadhere

JIRAhere

...

We will change the behavior of producer.send() so that it does not block on metadata update. If metadata is not available, producer.send() will enqueue the request into the producer's buffer and return a future immediately to user. If the queue is already full, producer.send() will still block up to max.block.ms as does the existing Kafka producer's implementation.

 

Proposed

...

Changes

 

Before the proposed change of this KIP:

The Currently the following will happen in the user thread after producer.send() is called by the user thread:Steps:

1) Get partition information of the give topic to determine the partition for the given message. Wait up to max.block.ms for cluster metadata update if metadata is not available for the give topic.
2) Serialize key and value of the given message.
3) Determine the Maybe allocate memory to create a new batch for the corresponding partition of the message given the key of the message.
4) Allocate memory and put the message into a per-partition queue. Wait up to max.block.ms if there is not enough free memory yet.
4) Compress and put the message into the batch.
5) proucer.send() will unblock and return a future.Thread model:

Asynchronously, producer's IO thread will keep draining messages from each per-partition queue and send messages to brokers.

 


After the proposed change of this KIP, the :

The following will happen after happen in the user thread after producer.send() is called by the user thread:Steps: 

1) Serialize key and value of the given message.

2.1) If the partition information of the topic is not available:
    - Allocate memory and put memory to store the given uncompressed message. Wait up to max.block.ms if there is not enough free memory yet.
    - Put the message into a per-topic queue.

2.2) If the partition information of the topic is available:
    - Maybe allocate memory to create a new batch for the corresponding partition of the message. Wait up to max.block.ms if there is not enough free memory yet.
    - Compress and put the message into the batch.

3) For every message in every non-empty per-topic queue whose topic's partition information is available,
    - Maybe allocate memory to create a new batch for the corresponding partition of the message. Wait up to max.block.ms if there is not enough free memory yet.
    - Compress and put the message into the batch.
    - Dealloate the buffer for the message

4) proucer.send() will unblock and return a future.

Thread model: 

 

In the rare scenario where user thread sends a message when the metadata is not available, but never calls producer.send() after metadata is available, the message will never be put into the per-partition queue with the changes above. In order to take care of this scenario, this KIP should additional modify the Asynchronously, producer's IO thread will drain messageds from the per-topic queue into the existing per-partition queue after the metadata for the given topic is available. The IO thread will still perform the existing task of draining messages from each per-partition queue and send messages to brokerssuch that, if no new ProduceRequest can be generated and there exists non-empty per-topic queue whose topic's partition information is available, IO thread should do the step 3) above to move these messages into the per-partition queue.

 

Theoretical Performance Analysis

Here we analyze how the changes proposed in this KIP would change the performance of producer:

 

1) Worst case time of producer.send() suppose the producer queue is not full.

Before this KIP, the producer.send() may block up to max.block.ms for metadata to become available. This KIP improves the latency in this scenario by putting the message in the per-topic queue without blocking producer.send().

This KIP may also increase the time of producer.send() in the following scenario. The first time producer.send() is called after metadata becomes available, according to step 3) in the proposed changes above, producer.send() will compress messages and put these messages into the batch for the corresponding partitions. If there is a lot of messages in the per-topic queues, this can take some time. But amount of time to compress messages should be considerably less than the time needed to wait for metadata (and compress messages) because user threads are better utilized by not waiting for metadata.

 

2) Throughput

This change does not affect the rate at which IO thread can send messages to producer. Thus the difference in throughput should mainly be caused by how we do computation intensive tasks (i.e. serialization and compression) and how we wait for metadata.

If producer.send() is called infrequently, throughput should not be an issue. If producer.send() is called frequently, all computation intensive task, i.e. compression and serialization, will still be done by the user threads, which is the same as the existing producer implementation. Thus the change proposed in the KIP sohuld not reduce throughput terms of computation.

On the other hand, since user thread will not longer block on metadata, user thread can be better utilized to do other useful work. So the overall throughput should be better with the change proposed in this KIP.

 

3) Latency of message from the time producer.send() is called to the time the message is acknowledged

If metadata is available, the latency should be the same.

If metadata is not available, messages can not be sent. latency should be dominated by the time to wait for metadata and will still be the same. Once the metadata becomes available, user thread will be busy compressing these messages and putting them into the per-partition queue. Since the throughput should be better (see the analysis above), the latency in this case should also be slightly better.

 

Compatibility, Deprecation, and Migration Plan

...