Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The first step to addressing the above changes is to make the fetching of metadata asynchronous within the producer. This directly fixes (3), and opens the path for resolving (1) by enabling the metadata requests to be batched together. Since the producer's interface is asynchronous and it inherently batches the sending of records to partitions, subjecting the metadata fetching to a subset of the batching delay doesn't change the interaction or expectations of the client. This change alone should be good enough to bring performance back to acceptable , (pending verification).

Specific modifications would be to make KafkaProducer#waitOnMetadata to be asynchronous when it must block. For uncached topics, the producer will maintain a queue of its outstanding records to ensure proper ordering (in the accumulator and for callback invocations) once topic metadata is resolved. Proper care must be taken to maintain the linger period for fetching metadata, and individual record timeout timeouts while queued, and record buffer memory limits.

To address (2), the producer currently maintains an expiry threshold for every topic, which is used to remove a topic from the working set at a future time (currently hard-coded to 5 minutes). While this does work to reduce the size of the topic working set, the producer will continue fetching metadata for these topics in every metadata request for the full expiry duration. This logic can be made more intelligent by managing the expiry from when the topic was last used, enabling the expiry duration to be reduced to improve cases where a large number of topics are touched intermittently. To control behavior, a configuration variable topic.expiry.ms should be added to the producer configuration.

...

Impact on client will be strictly internal performance improvements; no public APIs, protocols, or other external factors are being changed.

Rejected Alternatives

...

  • Allow the producer to specify interested topics. In doing so, many uncached topics could be fetched in a single request before records were produced for them, e.g. at startup. This would greatly alleviate the problem, however requires the clients to (1) implement a new producer call, and (2) know the working set of topics a priori. It'd obviate the need for fetching metadata asynchronously, which wouldn't resolve the throughput "bubble" that individual producer threads encounter when waiting for new topic metadata to be fetched.