Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Attribute Name

Description

records-latency-max

The maximum latency of any partition in this sample window for this clientthe partition with the longest latency.  Picks the largest partition record-latency-max of all partitions assigned to this client.

kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"

Attribute Name

Description

records-latency-max

The latency of the partition with the longest latency.  Picks the largest partition record-latency-max of all partitions of a topic assigned to this client.

kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"

Attribute Name

Description

records-latency

The latest latency of the partition

records-latency-max

The max latency of the partition in this sample window

records-latency-avg

The average latency of the partition

Define a new optional Consumer property.

Name

Description

Type

Valid Values

Importance

latency.time.class

Latency time class that implements org.apache.kafka.clients.consumer.LatencyTime.  Allows the user to setup their own clock to be used to get current wall clock time when performing the calculation to compute the Consumer Fetch Metrics metric records-latency.

class

low

Define an interface the user can implement to provide current wall clock time for latency.

...

languagejava

...

partition

...

in this sample window

Proposed Changes

Report latency metrics in the Kafka Consumer at the client (max latency) and partition level,  similar to how consumer lag is currently reported.

Since Kafka 0.10.0 the Kafka Producer will by default add a value for the timestamp property of a ProducerRecord unless it’s otherwise provided by the user.  The default value is the current wallclock time represented as a unix epoch long in milliseconds returned by the Java Standard Library call to System.getCurrentMillis().  In the general case where we assume records are produced and consumed in the same timezone, the same network, and on machines with active clock synchronization services, then the latency may be calculated by taking the difference of current wall clock time with the timestamp from a fetched record.To accommodate use cases where the user needs more control over the clock used to determine wall clock time (i.e. when the produced records are in a different timezone, or have their timestamps overridden) the user can configure a custom implementation of the interface org.apache.kafka.clients.consumer.LatencyTime to return the correct time for the latency calculation.  This would be a class name configuration type provided by the user using consumer properties (ConfigDef.Type.CLASS).

UNIX epoch time is always represented as UTC time, and is therefore agnostic of any machine's particular default timezone.

The implementation to calculate latency would be very similar to how records-lag is implemented in Fetcher.fetchRecords and the FetchManagerRegistry.  The latency metrics would be calculated once each poll for all partitions of records returned. The start timestamp will be extracted from the last record of each partition that is returned.  To retrieve the current wall clock time it would use either a LatencyTime class implementation configured by the user, or the default implementation which returns System.getCurrentMillis().  The latency would be calculated by taking the difference of the wall clock time and the provided record start timestamp. The latency would be recorded by the client and partition sensors for the records-latency metric.

At the partition level we can provide the latest calculated latency and the max and average latency within the metrics sample window.  At the topic and client level we only provide the max latency, which is is the max(record-latency-max) of all partitions assigned to a client for particular topic, or all topics.  An average, or some other percentile could also be represented.  A sum of partition latencies would not make sense because it's expected that consumers will consume partitions in parallel and not in a serial manner.

Compatibility, Deprecation, and Migration Plan

...

I have implemented this solution using a Consumer Interceptor and can share it if there’s interest.

Kafka Lag Exporter

Kafka Lag Exporter is an open source project that can estimate the latency of Kafka partitions.  It lets you track latency (or some aggregate of it) for apps, without having to modify the app in any way.  It polls the latest offset of all partitions belonging to consumer groups and topics to be reported and maintains an internal lookup table for each topic partition of offset and timestamp (wallclock time the offset was measured).  At the same time, consumer group metadata is polled to retrieve the consumer group id, topic partition, and the last committed offset.  With this information the an estimate is predicted by doing the following:

  1. Lookup interpolation table for a consumer group partition
  2. Find two points within the table that contain the last consumed offset
    1. If there are no two points that contain the last consumed offset then use the first and last points as input to the interpolation formula. This is the extrapolation use case.
  3. Interpolate inside (or extrapolate outside) the two points from the table we picked to predict a timestamp for when the last consumed message was first produced.
  4. Take the difference of the time of the last consumed offset (~ the current time) and the predicted timestamp to find the time lag.

https://www.lightbend.com/blog/monitor-kafka-consumer-group-latency-with-kafka-lag-exporter

While this tool works reasonably well it has several limiting factors

  1. It only works in cases where offsets are committed back to Kafka.  If an app or stream processor uses its own offset management then the current offset of a partition cannot be obtained from Kafka.
  2. It can only predict an estimate.  Accuracy improves with higher fidelity lookup tables (latest offsets looked up more frequently).

Disclosure: I (Sean Glover) am the author of the Kafka Lag Exporter project and the "Monitor Kafka Consumer Group Latency with Kafka Lag Exporter" blog post by Lightbend.