Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The KafkaConsumer is a complex client that incorporates different configurations for detecting consumer failure to allow remaining consumers to pick up the partitions of failed consumers. One such configuration is max.poll.interval.ms which is defined as:

max.poll.interval.msThe maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.int300000[1,...]medium

Timeout from this configuration typically happens when the application code to process the consumer's fetched records takes too long (longer than max.poll.interval.ms). Hitting this timeout will cause the consumer to leave the group and trigger a rebalance (if it is not a static member as described in KIP-345: Introduce static membership protocol to reduce consumer rebalances). The consumer will end up rejoining the group if processing time was the only issue (and not a static member). This scenario is not ideal as rebalancing will disrupt processing and take additional time.

...

  • Easily identify if/when max.poll.interval.ms needs to be changed (and to what value)
  • View trends/patterns
  • Verify max.poll.interval.ms was hit using the max metric when debugging consumption issues (if logs are not available)
  • Configure alerts to notify when average/max time is too close to max.poll.interval.ms

Example Usage

An application owner reports that their consumers are seeing the max.poll.interval.ms timeout error log mentioned in the previous section. The application owner may claim that their application code is fine and that the Kafka infrastructure is broken. The application does not have any instrumentation measuring the processing time of their application which makes it difficult for the Kafka operator to prove otherwise and resolve the issue.

The Kafka operator can only propose to:

  • Increase max.poll.interval.ms
  • Decrease max.poll.records

It is not clear what values to use without instrumentation, and it ends up taking a few attempts/deployments to find a value that works.


The metric proposed in this KIP would help in this scenario as it would definitely show that the time-between-poll-max took longer than the configured max.poll.interval.ms. Additionally, it would give an actual value that can be used as a starting point for increasing max.poll.interval.ms.

It would also show long-term trends to identify if this incident was:

  • Unusual anomaly: Check health/performance of dependencies
  • Processing time was always close to max.poll.interval.ms: Increase max.poll.interval.ms or decrease max.poll.records to avoid hitting it due to variance
  • Processing time was gradually increasing: Check health/performance of dependencies

Public Interfaces

We will add the following metrics:

Metric NameMbean NameDescription
time-between-poll-avg kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)The average delay between invocations of poll().
time-between-poll-max kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)The max delay between invocations of poll().

Proposed Changes

As we want this metric to measure the time that the user takes to call poll(), we will store a long lastPollMs in KafkaConsumer. We will calculate the elapsed time (and update lastPollMs) on every call to poll.

...