Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently KafkaConsumer only has a metric of max lag across all the partitions. It would be useful to know per partition lag as well.

Public Interfaces

There is no programmatic public interface change. We are only adding new metrics.

Proposed Changes

Add per partition lag metrics to KafkaConsumer. The metric names would be:

"TOPIC-PARTITION_ID.record-lag"
"TOPIC-PARTITION_ID.record-lag-avg"
"TOPIC-PARTITION_ID.record-lag-max" 

The way to get the metrics is the same as getting other metrics.

Code Block
languagejava
TopicPartition tp = new TopicPartition("topic", 0);
HashMap<String, String> tags = new HashMap<>();
tags.put("client-id", "metricTestConsumer0");

double currentLag = consumer0.metrics().get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)).value()
double averageLag = consumer0.metrics().get(new MetricName(tp + ".records-lag-avg", "consumer-fetch-manager-metrics", "", tags)).value()
double maxLag = consumer0.metrics().get(new MetricName(tp + ".records-lag-max", "consumer-fetch-manager-metrics", "", tags)).value()

Compatibility, Deprecation, and Migration Plan

The change is fully backwards compatible.

Rejected Alternatives

None