You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: Under Discussion

Discussion thread: http://mail-archives.apache.org/mod_mbox/kafka-users/201512.mbox/%3CCAL%2BArfWNfkpymkNDuf6UJ06CJJ63XC1bPHeT4TSYXKjSsOpu-Q%40mail.gmail.com%3E (user mailing list)

JIRA: Unable to render Jira issues macro, execution error.  and  Unable to render Jira issues macro, execution error. .

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The KafkaConsumer API centers around the poll() API which is intended to be called in a loop. On every iteration of the loop, poll() returns a batch of records which are then processed inline. For example, a typical consumption loop might look like this:

while (running) {
  ConsumerRecords<K, V> records = consumer.poll(1000);
  records.forEach(record -> process(record));
  consumer.commitSync();
}

In addition to fetching records, poll() is responsible for sending heartbeats to the coordinator and rebalancing when new members join the group and old members depart. The coordinator maintains a timer for every member in the group which is reset when a heartbeat is received. If no heartbeat is received before the expiration of a session timeout, then the member is kicked out of the group and its partitions are reassigned to other members. If this happens while the consumer is processing a batch of records, then there is no direct way for consumer to stop processing and rejoin the group. Instead, the loop will finish processing the full batch and only find out afterwards that it has been kicked out of the group. This can lead to duplicate consumption since the partitions will already have been reassigned to another member in the group before offsets can be committed. It can also cause excessive rebalancing if the processing time frequently gets close to 

Users can mitigate this problem by 1) increasing the session timeout, and 2) reducing the amount of data to process on each iteration of the loop. Increasing the session timeout is what we've recommended thus far, but it forces the user to accept the tradeoff of slower detection time for consumer failures. It's also worth noting that the time to rebalance depends crucially on the rate of heartbeats which is limited by the processing time in each iteration of the poll loop. If it takes one minute on average to process records, then it will also generally take one minute to complete rebalances. In some cases, when the processing time cannot be easily predicted, this option is not even viable without also adjusting the amount of data returned. For that, we give users a "max.partition.fetch.bytes" setting which limits the amount of data returned for each partition in every fetch. This is difficult to leverage in practice to limit processing time unless the user knows the maximum number of partitions they will consume from and can predict processing time according to the size of the data. In some cases, processing time does not even correlate directly with the size of the records returned, but instead with the count returned. It is also difficult to deal with the effect of increased load which can simultaneously increase the amount of data fetched and increase the amount of processing time.

To summarize, it is difficult with the current API to tune the poll loop to avoid unexpected rebalances caused by processing overhead. In this KIP, we propose a way to limit the number of messages returned by the poll() call.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

In this KIP, we propose to solve this problem by giving users more control over the number of records returned in each call to poll(). In particular, we add a configuration parameter, max.poll.messages, which can be set on the configuration map given to KafkaConsumer constructor. If the consumer fetches more records than the maximum provided in max.poll.messages, then it will keep the additional records until the next call to poll(). Below we describe the proposed API and provide a detailed design for its implementation.

Clearly the new configuration option will also need to be added to the documentation.

Prefetching and fairness

TODO:

  • Specify how pre-fetching will work.
  • Specify how fairness will work among different partitions and/or topics.

Compatibility, Deprecation, and Migration Plan

There will be no impact on existing users.

Rejected Alternatives

Introducing a second poll argument

An alternative way of limitting the number of messages would be to introduce a second argument to poll(), which controls the maximum number of records which will be returned. It would have the following advantages:

  • It would be more explicit. A counterargument is that there are many configuration options already. Avoiding yet another would not make that much difference.
  • It would allow the poll loop to dynamically change the max number of messages returned. However, this sounds like a non-requirement.

However, it would also have the following disadvantages:

  • It would introduce yet another public method to would have to be maintained for a long time in the future.
  • Introducing the max.poll.messages approach would allow us to make proper allocations at start-up instead of dynamically doing special logic in every poll() call.

It should also be mentioned that introducing a configuration parameter will not hinder us from introducing a second parameter to poll() in the future.

Adding a KafkaConsumer#heartbeat()/ping() method

See http://mail-archives.apache.org/mod_mbox/kafka-users/201512.mbox/%3CCAJDuW=Auq8bM7JVaDmEBMmJscTZ812+F4rP_Wkwa6mR6JmMMHw@mail.gmail.com%3E.

 

 

  • No labels