Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • This KIP proposes the addition of latency and throughput metrics for Kafka Streams at the granularity of each processor node and the addition of count metrics at the granularity of each task. This is in addition to the global rate (which already exists). The idea is to allow users to toggle the recording of these metrics when needed for debugging. The RecordLevel for these granular metrics is DEBUG, and a client can toggle the record level by changing the  “metrics.record.level” in the client config. (The introduction of RecordLevel and client config changes are covered in the separate KIP-105).
  • This KIP also proposes exposing the metrics registry as read-only and several helper functions so that Kafka Streams users can register their own metrics. 

...

  • A StreamsMetrics class with the following methods:
    public interface StreamsMetrics {
    /**
    * @return The base registry where all the metrics are recorded Get read-only handle on global metrics registry
    * @return Map of all metrics.
    */
     Metrics registryMap<MetricName, ? extends Metric> metrics();

     /**
    * Add a latency sensor. This is equivalent to adding a sensor with metrics on latency and rate.
    *
    * @param scopeName Name of the scope, could be the type of the state store, etc.
    * @param entityName Name of the entity, could be the name of the state store instance, etc.
    * @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
    * @param operationName Name of the operation, could be get / put / delete / etc.
    * @param tags Additional tags of the sensor.
    * @return The added sensor.
    */
     Sensor addLatencySensor(String scopeName, String entityName, String operationName, Sensor.RecordLevel recordLevel, String... tags);

     /**
    * Record the given latency value of the sensor.
    * @param sensor sensor whose latency we are recording.
    * @param startNs start of measurement time in nanoseconds.
    * @param endNs end of measurement time in nanoseconds.
    */
     void recordLatency(Sensor sensor, long startNs, long endNs);

     /**
    * Add a throughput sensor. This is equivalent to adding a sensor with metrics rate.
    *
    * @param scopeName Name of the scope, could be the type of the state store, etc.
    * @param entityName Name of the entity, could be the name of the state store instance, etc.
    * @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
    * @param operationName Name of the operation, could be get / put / delete / etc.
    * @param tags Additional tags of the sensor.
    * @return The added sensor.
    */
     Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordLevel recordLevel, String... tags);

     /**
    * Records the throughput value of a sensor.
    * @param sensor sensor whose throughput we are recording.
    * @param value throughput value.
    */
     void recordThroughput(Sensor sensor, long value);
    }

     

    Follows from the above that we are also for the first time exposing the Metrics interface. The Metrics class has always been internal so far. The argument for exposing it is that, unlike for simple Producers and Consumers, the application's code and stream's code co-exist and we have the objective of having app metrics in the same registry as stream metrics. We could attempt to provide yet another layer on top of Metrics, but we would be replicating a lot of code. The Metrics class has been in org.apache.kafka.common.metrics for a while but since we have never exposed it externally, we list it's methods as part of this KIP:
    public class Metrics implements Closeable {
    /**
    * Create a metrics repository with no metric reporters and default configuration.
    * Expiration of Sensors is disabled.
    */
     public Metrics();
    /**
    * Create a metrics repository with no metric reporters and default configuration.
    * Expiration of Sensors is disabled.
    */
     public Metrics(Time time);
    /**
    * Create a metrics repository with no metric reporters and the given default configuration.
    * Expiration of Sensors is disabled.
    */
     public Metrics(MetricConfig defaultConfig, Time time);
    /**
    * Create a metrics repository with no reporters and the given default config. This config will be used for any
    * metric that doesn't override its own config. Expiration of Sensors is disabled.
    * @param defaultConfig The default config to use for all metrics that don't override their config
    */
     public Metrics(MetricConfig defaultConfig);
    /**
    * Create a metrics repository with a default config and the given metric reporters.
    * Expiration of Sensors is disabled.
    * @param defaultConfig The default config
    * @param reporters The metrics reporters
    * @param time The time instance to use with the metrics
    */
     public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time);
    /**
    * Create a metrics repository with a default config, given metric reporters and the ability to expire eligible sensors
    * @param defaultConfig The default config
    * @param reporters The metrics reporters
    * @param time The time instance to use with the metrics
    * @param enableExpiration true if the metrics instance can garbage collect inactive sensors, false otherwise
    */
     public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, boolean enableExpiration);
    /**
    * Create a MetricName with the given name, group, description and tags, plus default tags specified in the metric
    * configuration. Tag in tags takes precedence if the same tag key is specified in the default metric configuration.
    Remove a sensor with the given name. *
    * @param name TheSensor name of the metric
    * @param group logical group name of the metrics to which this metric belongs
    * @param description A human-readable description to include in the metric
    * @param tags additional key/value attributes of the metric
    */
     public MetricName metricName(String name, String group, String description, Map<String, String> tags);
    /**
    * Create a MetricName with the given name, group, description, and default tags
    * specified in the metric configuration.
    *
    * @param name The name of the metric
    * @param group logical group name of the metrics to which this metric belongs
    * @param description be removed.A human-readable description to include in the metric
    */
     public MetricName metricNamevoid removeSensor(String name, String group, String description);/**
    * Create a MetricName with the given name, group and default tags specified in the metric configuration.
    *
    * @param name The name of the metric
    * @param group logical group name of the metrics to which this metric belongs
    */
     public MetricName metricName(String name, String group);
    /**
    * Create a MetricName with the given name, group, description, and keyValue as tags, plus default tags specified in the metric
    * configuration. Tag in keyValue takes precedence if the same tag key is specified in the default metric configuration.
    *
    * @param name The name of the metric
    * @param group logical group name of the metrics to which this metric belongs
    * @param description A human-readable description to include in the metric
    * @param keyValue additional key/value attributes of the metric (must come in pairs)
    */
     public MetricName metricName(String name, String group, String description, String... keyValue);
    /**
    * Create a MetricName with the given name, group and tags, plus default tags specified in the metric
    * configuration. Tag in tags takes precedence if the same tag key is specified in the default metric configuration.
    *
    * @param name The name of the metric
    * @param group logical group name of the metrics to which this metric belongs
    * @param tags key/value attributes of the metric
    */
     public MetricName metricName(String name, String group, Map<String, String> tags);
    public MetricConfig config();
    /**
    * Get the sensor with the given name if it exists
    * @param name The name of the sensor
    * @return Return the sensor or null if no such sensor exists
    */
     public Sensor getSensor(String name);
    /**
    * Get or create a sensor with the given unique name and no parent sensors.
    * @param name The sensor name
    * @return The sensor
    */
     public Sensor sensor(String name);

    }

  • Furthermore, the KafkaStreams class can also expose all the metrics read-only. So we add the same method we added to the StreamsMetrics interface. 
    /**
    * Get orread-only createhandle a sensor with the given unique name and no parent sensors and with a given
    * recording level.
    * @param name The sensor name.
    * @param recordLevel The recording level.
    * @return The sensor
    */ public Sensor sensor(String name, Sensor.RecordLevel recordLevel);
    /**
    * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor
    * @param parents The parent sensors
    * @return The sensor that is created
    */
     public Sensor sensor(String name, Sensor... parents);
    /**
    * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor.
    * @param parents The parent sensors.
    * @param recordLevel The recording level.
    * @return The sensor that is created
    */
    public Sensor sensor(String name, Sensor.RecordLevel recordLevel, Sensor... parents);
     /**
    * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor
    * @param config A default configuration to use for this sensor for metrics that don't have their own config
    * @param parents The parent sensors
    * @return The sensor that is created
    */
     public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents);
    /**
    * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor.
    * @param parents The parent sensors.
    * @param recordLevel The recording level.
    * @return The sensor that is created */
    public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordLevel recordLevel, Sensor... parents);
    /**
    * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor
    * @param config A default configuration to use for this sensor for metrics that don't have their own config
    * @param recordLevel The recording level.
    * @param parents The parent sensors
    * @return The sensor that is created
    */
    public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordLevel recordLevel, Sensor... parents);

     

    /**
    * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor
    * @param config A default configuration to use for this sensor for metrics that don't have their own config
    * @param inactiveSensorExpirationTimeSeconds If no value if recorded on the Sensor for this duration of time,
    * it is eligible for removal
    * @param parents The parent sensors
    * @param recordLevel The recording level.
    * @return The sensor that is created
    */
    public synchronized Sensor sensor(String name, MetricConfig config, long inactiveSensorExpirationTimeSeconds, Sensor.RecordLevel recordLevel, Sensor... parents);
     /**
    * Remove a sensor (if it exists), associated metrics and its children.
    *
    * @param name The name of the sensor to be removed
    */
     public void removeSensor(String name);
    /**
    * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
    * This is a way to expose existing values as metrics.
    * @param metricName The name of the metric
    * @param measurable The measurable that will be measured by this metric
    */
     public void addMetric(MetricName metricName, Measurable measurable);
    /**
    * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
    * This is a way to expose existing values as metrics.
    * @param metricName The name of the metric
    * @param config The configuration to use when measuring this measurable
    * @param measurable The measurable that will be measured by this metric
    */
     public synchronized void addMetric(MetricName metricName, MetricConfig config, Measurable measurable);
    /**
    * Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval`
    * will be invoked for each reporter.
    *
    * @param metricName The name of the metric
    * @return the removed `KafkaMetric` or null if no such metric exists
    */
     public synchronized KafkaMetric removeMetric(MetricName metricName);
    /**
    * Add a MetricReporter
    */
     public synchronized void addReporter(MetricsReporter reporter);
    /**
    * Get all the metrics currently maintained indexed by metricName
    */
     public Map<MetricName, KafkaMetric> metrics();
     
    public KafkaMetric metric(MetricName metricName);
     /**
    * Close this metrics repository.
    */
     @Override
     public void close();
    }on global metrics registry
    * @return Map of all metrics.
    */
    Map<MetricName, ? extends Metric> metrics();

Proposed Changes

  • Enumeration of Sensors: This KIP proposes the introduction of the following sensors

    • Node punctuate time sensor: This sensor is associated with latency metrics depicting the average and max latency in the punctuate time of a node.

    • Node creation time sensor: This sensor is associated with latency metrics depicting the average and max latency in the creation time of a node.

    • Node destruction time sensor:  This sensor is associated with latency metrics depicting the average and max latency in the destruction time of a node.

    • Node process time sensor: This sensor is associated with latency metrics depicting the average and max latency in the process time of a node.

    • Node throughput sensor: This sensor is associated with throughput metrics depicting the context forwarding rate of metrics through a node, i.e., indicating how many records were forwarded downstream from this processor node.

    • Skipped records sensor in StreamTask:This sensor is associated with a count metric, which helps monitor if streams are well synchronized. The metric measures the difference in the total record count and the number of added records between the last record time. This is useful during debugging as this count should not be off by too much during normal operations.
    • Finally all metrics can be read as read-only through the metrics() calls.
  • Addition of new sensors

    • Users can use the provided helped functions addLatencySensor and addThroughputSensor or  to register metrics directly with the exposed underlying metrics registry obtained through registry().and removeSensor to remove sensors. Note that addLatencySensor already existed in the code base.

       

Compatibility, Deprecation, and Migration Plan

  • none

Rejected Alternatives

  • Allow the user to register arbitrary metrics by exposing the Metrics class. Unfortunately the class has been used internally so far and is not ready for becoming public yet (e.g., there are several unnecessary methods in there). This might have to wait until the Metrics class is cleaned up.
  • Provide an interface on top of the Metrics class. This is doeable, however StreamMetrics is arguable already such an interface and allows users to register throughput and latency metrics for streams.none