Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Attribute Name

Description

records-latency-max

The maximum latency of any partition in this sample window for this clientthe partition with the longest latency.  Picks the largest partition record-latency-max of all partitions assigned to this client.

kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"

Attribute Name

Description

records-latency-max

The latency of the partition with the longest latency.  Picks the largest partition record-latency-max of all partitions of a topic assigned to this client.

kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"

Attribute Name

Description

records-latency

The latest latency of the partition

records-latency-max

The max latency of the partition in this sample window

records-latency-avg

The average latency of the partition

Define a new optional Consumer property.

...

Name

...

Description

...

Type

...

Valid Values

...

Importance

...

latency.time.class

...

Latency time class that implements org.apache.kafka.clients.consumer.LatencyTime.  Allows the user to setup their own clock to be used to get current wall clock time when performing the calculation to compute the Consumer Fetch Metrics metric records-latency.

...

class

...

low

Define an interface the user can implement to provide current wall clock time for latency.

...

languagejava

...

in this sample window

Proposed Changes

Report latency metrics in the Kafka Consumer at the client (max latency) and partition level,  similar to how consumer lag is currently reported.

Since Kafka 0.10.0 the Kafka Producer will by default add a value for the timestamp property of a ProducerRecord unless it’s otherwise provided by the user.  The default value is the KIP-32 introduced a timestamp field to the Kafka message format.  The timestamp could be provided by the user, the KafkaProducer, or the Broker, depending on how the Broker configuration message.timestamp.type is defined.  When defined by Kafka the timestamp value uses current wallclock time represented as a unix epoch long in milliseconds returned by the Java Standard Library call to System.getCurrentMillis()In the general case where we assume records are produced and consumed in the same timezone, the same network, and on machines with We assume that all application and Broker machines enable active clock synchronization services , then and that the latency may be calculated by taking the difference of current wall clock time with the timestamp from a fetched record.To accommodate use cases where the user needs more control over the clock used to determine wall clock time (i.e. when the produced records are in a different timezone, or have their timestamps overridden) the user can configure a custom implementation of the interface ConsumerRecord.  When latency is calculated as negative then the metric value will be reported as NaN.

UNIX epoch time is always represented as UTC time, and is therefore agnostic of any machine's particular default timezone.org.apache.kafka.clients.consumer.LatencyTime to return the correct time for the latency calculation.  This would be a class name configuration type provided by the user using consumer properties (ConfigDef.Type.CLASS).

The implementation to calculate latency would be very similar to how records-lag is implemented in Fetcher.fetchRecords and the FetchManagerRegistry.  The latency metrics would be calculated once each poll for all partitions of records returned. The start timestamp will be extracted from the last record of each partition that is returned.  To retrieve the current wall clock time it would use either a LatencyTime class implementation configured by the user, or the default implementation which returns System.getCurrentMillis().  The latency would be calculated by taking the difference of the wall clock time and the provided record start timestamp. The latency would be recorded by the client and partition sensors for the records-latency metric.

At the partition level we can provide the latest calculated latency and the max and average latency within the metrics sample window.  At the topic and client level we only provide the max latency, which is is the max(record-latency-max) of all partitions assigned to a client for particular topic, or all topics.  An average, or some other percentile could also be represented.  A sum of partition latencies would not make sense because it's expected that consumers will consume partitions in parallel and not in a serial manner.

Info
titleUsing Latency Metric for SLAs

If a message was produced a long time ago, and a new consumer group has been created, then the latency metrics will have very high values until the consumer group catches up. This is especially true in the context of KIP-405: Kafka Tiered Storage, which allows reading very old messages. Therefore, a consumer application that relies on reading all messages from the past will report a high records-latency for a while.

Using this metric for SLAs should only be done when a consumer group is expected to be continuously consuming (in a steady state), and not for bootstrapping new consumer groups.

Compatibility, Deprecation, and Migration Plan

...

I have implemented this solution using a Consumer Interceptor and can share it if there’s interest.

Kafka Lag Exporter

Kafka Lag Exporter is an open source project that can estimate the latency of Kafka partitions.  It lets you track latency (or some aggregate of it) for apps, without having to modify the app in any way.  It polls the latest offset of all partitions belonging to consumer groups and topics to be reported and maintains an internal lookup table for each topic partition of offset and timestamp (wallclock time the offset was measured).  At the same time, consumer group metadata is polled to retrieve the consumer group id, topic partition, and the last committed offset.  With this information the an estimate is predicted by doing the following:

  1. Lookup interpolation table for a consumer group partition
  2. Find two points within the table that contain the last consumed offset
    1. If there are no two points that contain the last consumed offset then use the first and last points as input to the interpolation formula. This is the extrapolation use case.
  3. Interpolate inside (or extrapolate outside) the two points from the table we picked to predict a timestamp for when the last consumed message was first produced.
  4. Take the difference of the time of the last consumed offset (~ the current time) and the predicted timestamp to find the time lag.

https://www.lightbend.com/blog/monitor-kafka-consumer-group-latency-with-kafka-lag-exporter

While this tool works reasonably well it has several limiting factors

  1. It only works in cases where offsets are committed back to Kafka.  If an app or stream processor uses its own offset management then the current offset of a partition cannot be obtained from Kafka.
  2. It can only predict an estimate.  Accuracy improves with higher fidelity lookup tables (latest offsets looked up more frequently).

Disclosure: I (Sean Glover) am the author of the Kafka Lag Exporter project and the "Monitor Kafka Consumer Group Latency with Kafka Lag Exporter" blog post by Lightbend.