...
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Consumer lag is a useful metric to monitor how many records are queued to be processed. We can look at individual lag per partition or we may aggregate metrics. For example, we may want to monitor what the maximum lag of any particular partition in our consumer subscription so we can identify hot partitions, caused by an insufficient producing partitioning strategy. We may want to monitor a sum of lag across all partitions so we have a sense as to our total backlog of messages to consume. Lag in offsets is useful when you have a good understanding of your messages and processing characteristics, but it doesn’t tell us how far behind in time we are. This is known as wait time in queueing theory, or more informally it’s referred to as latency.
...
As an example, below is a screenshot of a Grafana dashboard with a panel for measuring the sum of consumer lag per partition on the LHS and a panel for measuring the top partition with the highest latency on the RHS. The latency panel also defines an alert that is triggered when latency exceeds 30 seconds over a 5 minute window.
Public Interfaces
Define new latency metrics under the Consumer Fetch Metrics metrics group.
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
Attribute Name | Description |
records-latency-max | The maximum latency of any partition in this sample window for this client |
kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
Attribute Name | Description |
records-latency | The latest latency of the partition |
records-latency-max | The max latency of the partition |
records-latency-avg | The average latency of the partition |
Define a new optional Consumer property.
Name | Description | Type | Valid Values | Importance |
latency.time.class | Latency time class that implements | class | low |
Define an interface the user can implement to provide current wall clock time for latency.
Code Block | ||
---|---|---|
| ||
public interface LatencyTime extends Configurable, Closeable { /** * Get wall clock time given a provided topic and partition. Return wall clock time as a * UNIX epoch time in milliseconds of type long. */ public long getWallClockTime(TopicPartition tp); /** * This is called when the latency time implementation is closed */ public void close(); } |
Proposed Changes
Report latency metrics in the Kafka Consumer at the client (max latency) and partition level, similar to how consumer lag is currently reported.
...
The implementation to calculate latency would be very similar to how records-lag is implemented in Fetcher.fetchRecords
and the FetchManagerRegistry
. The latency metrics would be calculated once each poll for all partitions of records returned. The start timestamp will be extracted from the last record of each partition that is returned. To retrieve the current wall clock time it would use either a LatencyTime
class implementation configured by the user, or the default implementation which returns System.getCurrentMillis()
. The latency would be calculated by taking the difference of the wall clock time and the provided record start timestamp. The latency would be recorded by the client and partition sensors for the records-latency
metric.
Compatibility, Deprecation, and Migration Plan
N/A
Rejected Alternatives
Use a Consumer Interceptor
...