Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under DiscussionAccepted"

Discussion thread

JIRA: KAFKA-13846

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Concurrent thread may try to access the Metrics registry to create the same, instance-level metric, however it's get/create APIs are not well suited for it. A common pattern that users follow today is:

...

The main motivation of this KIP is to expose an API which would make these operations atomic. That ways, users won't need to remember these steps and can just focus on having a metric created. 


Public Interfaces

A new public facing method getMetricOrElseCreate would be exposed. This would create a non-existing metric or create a return the Metric if it already exists. This way, users don't need to add extra logic to take respective actions in case of presence/absence of metrics. Note that this method takes care of synchronisation while updating/accessing metrics by concurrent threads.

...

Code Block
languagejava
titleMetrics.java
/**
     * Create or get an existing metric to monitor an object that implements MetricValueProvider.
     * This metric won't be associated with any sensor. This is a way to expose existing values as metrics.
     * This method takes care of synchronisation while updating/accessing metrics by concurrent threads.
     *
     * @param metricName The name of the metric
     * @param metricValueProvider The metric value provider associated with this metric
     * @return Existing KafkaMetric if already registered or else a newly created one
     */
    public KafkaMetric getMetricOrElseCreateaddMetricIfAbsent(MetricName metricName, MetricConfig config, MetricValueProvider<?> metricValueProvider) {
        KafkaMetric metric = new KafkaMetric(new Object(),
                Objects.requireNonNull(metricName),
                Objects.requireNonNull(metricValueProvider),
                config == null ? this.config : config,
                time);

        KafkaMetric existingMetric = registerMetric(metric);
        return existingMetric == null ? metric : existingMetric;
    }

Proposed Changes

  • Adding a new function above to the Metrics API. As part of this change, the registerMetric  method's return type would be changed from void  to KafkaMetric . It would return a KafkaMetric object if the requested metric already exists or return null if not after creating/registering the metric. For backward compatibility reasons, any place currently which relied on IllegalArgumentException would now instead check the output of registerMetric  and throw an IllegalArguementException  when the returned value of registerMetric  is non-null. This change would happen in => Metrics.addMetric , 2 Sensor.add  methods. On the other hand, getMetricOrElseCreate  method would simply return the object returned by registerMetric if not null.

    Code Block
    languagejava
    /**
         * Register a metric if not present or return an already existing metric otherwise.
         * When a metric is newly registered, this method returns null
         *
         * @param metric The KafkaMetric to register
         * @return KafkaMetric if the metric already exists, null otherwise
         */ 
    synchronized KafkaMetric registerMetric(KafkaMetric metric) {
            MetricName metricName = metric.metricName();
            KafkaMetric existingMetric if= (this.metrics.containsKeyputIfAbsent(metricName, metric);
            if (existingMetric != null) {
                return this.metrics.get(metricName)existingMetric;
            }
            this.metrics.put(metricName,// newly added metric);
            for (MetricsReporter reporter : reporters) {
                try {
                    reporter.metricChange(metric);
                } catch (Exception e) {
                    log.error("Error when registering metric on " + reporter.getClass().getName(), e);
                }
            }
            log.trace("Registered metric named {}", metricName);
            return null;
        }


Compatibility, Deprecation, and Migration Plan

The changes are backward compatible and needs no deprecation/migration.

Rejected Alternatives

N/A