Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: DRAFTING

Discussion thread: here

JIRA: here

Motivation

The KafkaConsumer is a complex client that incorporates different configurations for detecting consumer failure to allow remaining consumers to pick up the partitions of failed consumers. One such configuration is max.poll.interval.ms which is defined as:

...

This scenario is typically hit when the application code to process the consumer's fetched records takes too long (longer than max.poll.interval.ms). Hitting this timeout will cause the consumer to leave the group and trigger a rebalance (if it is not a static member as described in KIP-345: Introduce static membership protocol to reduce consumer rebalances). The consumer will end up rejoining the group if processing time was the only issue.

...

  1. Minimum processing time is long to begin with (ex. hit a database which takes 1 second per record minimum)
  2. Processing involves talking to a downstream service which sometimes causes a long response spike in processing time (ex. intermittent load issues)

In such cases, the user must fine-tune the configurations to fit their use-case however detection of such events is currently difficult. The only way to identify this scenario is by searching application logs or the user must record their processing time on their own. The consumer will log an error when such threshold max.poll.interval.ms is hit:

Code Block
Member {} sending LeaveGroup request to coordinator {} due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

It would be beneficial to add a metric to record the average/max time between calls to poll so that users do not have to do it and operators can have this out of box (so operators do not need to ask application owners to do this).

Public Interfaces

TODO

Proposed Changes

TODODescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

...

As this KIP simply adds new metrics, there is no issue regarding compatibility, deprecation, or migration plan.

Rejected Alternatives