Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To summarize, it is difficult with the current API to tune the poll loop to avoid unexpected rebalances caused by processing overhead. In To address this KIPproblem, we propose a way to limit the number of messages returned by the poll() call.

...

In this KIP, we propose to solve this the problem above by giving users more control over the number of records returned in each call to poll(). In particular, we add a configuration parameter, max.poll.records, which can be set on the configuration map given to KafkaConsumer constructor. If the consumer fetches more records than the maximum provided in max.poll.records, then it will keep the additional records until the next call to poll(). As before, poll() will continue to send heartbeats in accordance with the configured heartbeat interval, and offset commits will use the position of the last offset returned to the user. The default configuration will preserve the current behavior, which places no upper bound on the number of messages returned in a call to poll(). Below we describe how  and how it impacts prefetchingSince the proposed change only affects configuration, users should not need to make any code changes to take advantage of it.

Below we discuss a couple implementation details.

Ensuring Fair Consumption

Since we are only returning a subset of the records that have been fetched, we must decide which ones to return. In a naive implementation, we might iterate through whatever collection the records are stored in until we've accumulated max.poll.records records and return these without further thought. In the worst case, this could leave some partitions indefinitely unconsumed since the consumer may fetch new data for those partitions before the next call to poll(). The data would keep getting filled for the returned partitions and the unlucky partitions ones stuck at the end would be starved.

Ideally, we would include records from each partition proportionally according to the number of assigned partitions. For example, if two partitions are assigned and max.poll.records is set to 100, then we would return 50 records from each partition. However, this is subject to the availability of the fetched data. If records are only available from one of the partitions, then we might should return 100 records from that partition and none from the other. In general, we can't make any guarantees about actual delivery (Kafka itself does not return data proportionally in a fetch response for each requested partition), but we just attempt to should give each partition a fair chance for consumption.

One way to achieve this would be to pull records from each partition in a round-robin fashion. If partitions A, B, and C were assigned and max records was set to 30, we would first try to pull 10 messages from A, then 10 from B, and so on. We would have to keep track of which partition we left off at and begin there on the next call to avoid the starvation problem mentioned. This may require multiple passes over the partitions since not all of them will have data available.

A simpler approach would be to pull as many records as possible from each partition in a similar round-robin fashion. Using the same example, we would first try to pull 30 records from A. If there was still space available, we would try to pull from B and so on. As before, we'd keep track of where we left off. This does would not achieve the ideal balancing described above, but it also still ensures that each partition will get gets consumed and requires only a single pass over the fetched records.  We favor this approach for simplicity.

Prefetching

The KafkaConsumer implements a prefetching optimization to improve throughput. Before returning a set of records to the user in poll(), the consumer will initiate the next round of fetches in order to pipeline the fetching overhead and message processing. While the consumer is processing the current batch of records, the broker can handle the consumer's fetch requests (including blocking for new data if fetch.min.bytes is configured). The idea is to have data already available when the consumer finishes processing and invokes poll() again.

...