Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Consumer lag is a useful metric to monitor how many records are queued to be processed.  We can look at individual lag per partition or we may aggregate metrics. For example, we may want to monitor what the maximum lag of any particular partition in our consumer subscription so we can identify hot partitions, caused by an insufficient producing partitioning strategy.  We may want to monitor a sum of lag across all partitions so we have a sense as to our total backlog of messages to consume. Lag in offsets is useful when you have a good understanding of your messages and processing characteristics, but it doesn’t tell us how far behind in time we are.  This is known as wait time in queueing theory, or more informally it’s referred to as latency.

...

As an example, below is a screenshot of a Grafana dashboard with a panel for measuring the sum of consumer lag per partition on the LHS and a panel for measuring the top partition with the highest latency on the RHS.  The latency panel also defines an alert that is triggered when latency exceeds 30 seconds over a 5 minute window.

Public Interfaces

Define new latency metrics under the Consumer Fetch Metrics metrics group.

kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"

Attribute Name

Description

records-latency-max

The maximum latency of any partition in this sample window for this client

kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"

Attribute Name

Description

records-latency

The latest latency of the partition

records-latency-max

The max latency of the partition

records-latency-avg

The average latency of the partition

Define a new optional Consumer property.

Name

Description

Type

Valid Values

Importance

latency.time.class

Latency time class that implements org.apache.kafka.clients.consumer.LatencyTime.  Allows the user to setup their own clock to be used to get current wall clock time when performing the calculation to compute the Consumer Fetch Metrics metric records-latency.

class


low

Define an interface the user can implement to provide current wall clock time for latency.

Code Block
languagejava
public interface LatencyTime extends Configurable, Closeable {
    /**
     * Get wall clock time given a provided topic and partition. Return wall clock time as a
     * UNIX epoch time in milliseconds of type long.
     */
    public long getWallClockTime(TopicPartition tp);

    /**
     * This is called when the latency time implementation is closed
     */
    public void close();
}

Proposed Changes

Report latency metrics in the Kafka Consumer at the client (max latency) and partition level,  similar to how consumer lag is currently reported.

...

The implementation to calculate latency would be very similar to how records-lag is implemented in Fetcher.fetchRecords and the FetchManagerRegistry.  The latency metrics would be calculated once each poll for all partitions of records returned. The start timestamp will be extracted from the last record of each partition that is returned.  To retrieve the current wall clock time it would use either a LatencyTime class implementation configured by the user, or the default implementation which returns System.getCurrentMillis().  The latency would be calculated by taking the difference of the wall clock time and the provided record start timestamp. The latency would be recorded by the client and partition sensors for the records-latency metric.

Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

Use a Consumer Interceptor

...