You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

 

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-3715: Higher granularity Streams metrics

Motivation

 

  • This KIP proposes the addition of latency and throughput metrics for Kafka Streams at the granularity of each processor node and the addition of count metrics at the granularity of each task. This is in addition to the global rate (which already exists). The idea is to allow users to toggle the recording of these metrics when needed for debugging. The RecordLevel for these granular metrics is DEBUG, and a client can toggle the record level by changing the  “metrics.record.level” in the client config. (The introduction of RecordLevel and client config changes are covered in the separate KIP-105).
  • This KIP also proposes exposing the metrics registry and several helper functions so that Kafka Streams users can register their own metrics. 


 

Public Interfaces

  • A StreamsMetrics class with the following methods:
    public interface StreamsMetrics {

    /**
    * @return The base registry where all the metrics are recorded.
    */
     Metrics registry();

     /**
    * Add a latency sensor. This is equivalent to adding a sensor with metrics on latency and rate.
    *
    * @param scopeName Name of the scope, could be the type of the state store, etc.
    * @param entityName Name of the entity, could be the name of the state store instance, etc.
    * @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
    * @param operationName Name of the operation, could be get / put / delete / etc.
    * @param tags Additional tags of the sensor.
    * @return The added sensor.
    */
     Sensor addLatencySensor(String scopeName, String entityName, String operationName, Sensor.RecordLevel recordLevel, String... tags);

     /**
    * Record the given latency value of the sensor.
    * @param sensor sensor whose latency we are recording.
    * @param startNs start of measurement time in nanoseconds.
    * @param endNs end of measurement time in nanoseconds.
    */
     void recordLatency(Sensor sensor, long startNs, long endNs);

     /**
    * Add a throughput sensor. This is equivalent to adding a sensor with metrics rate.
    *
    * @param scopeName Name of the scope, could be the type of the state store, etc.
    * @param entityName Name of the entity, could be the name of the state store instance, etc.
    * @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
    * @param operationName Name of the operation, could be get / put / delete / etc.
    * @param tags Additional tags of the sensor.
    * @return The added sensor.
    */
     Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordLevel recordLevel, String... tags);

     /**
    * Records the throughput value of a sensor.
    * @param sensor sensor whose throughput we are recording.
    * @param value throughput value.
    */
     void recordThroughput(Sensor sensor, long value);

    }

     

  • Follows from the above that we are also for the first time exposing the Metrics interface. The Metrics class has always been internal so far. The argument for exposing it is that, unlike for simple Producers and Consumers, the application's code and stream's code co-exist and we have the objective of having app metrics in the same registry as stream metrics. We could attempt to provide yet another layer on top of Metrics, but we would be replicating a lot of code. The Metrics class has been in org.apache.kafka.common.metrics for a while but since we have never exposed it externally, we list it's methods as part of this KIP:
    public class Metrics implements Closeable {
    /**
    * Create a metrics repository with no metric reporters and default configuration.
    * Expiration of Sensors is disabled.
    */
     public Metrics();

    /**
    * Create a metrics repository with no metric reporters and default configuration.
    * Expiration of Sensors is disabled.
    */
     public Metrics(Time time);

    /**
    * Create a metrics repository with no metric reporters and the given default configuration.
    * Expiration of Sensors is disabled.
    */
     public Metrics(MetricConfig defaultConfig, Time time);


    /**
    * Create a metrics repository with no reporters and the given default config. This config will be used for any
    * metric that doesn't override its own config. Expiration of Sensors is disabled.
    * @param defaultConfig The default config to use for all metrics that don't override their config
    */
     public Metrics(MetricConfig defaultConfig);

    /**
    * Create a metrics repository with a default config and the given metric reporters.
    * Expiration of Sensors is disabled.
    * @param defaultConfig The default config
    * @param reporters The metrics reporters
    * @param time The time instance to use with the metrics
    */
     public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time);

    /**
    * Create a metrics repository with a default config, given metric reporters and the ability to expire eligible sensors
    * @param defaultConfig The default config
    * @param reporters The metrics reporters
    * @param time The time instance to use with the metrics
    * @param enableExpiration true if the metrics instance can garbage collect inactive sensors, false otherwise
    */
     public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, boolean enableExpiration);

    /**
    * Create a MetricName with the given name, group, description and tags, plus default tags specified in the metric
    * configuration. Tag in tags takes precedence if the same tag key is specified in the default metric configuration.
    *
    * @param name The name of the metric
    * @param group logical group name of the metrics to which this metric belongs
    * @param description A human-readable description to include in the metric
    * @param tags additional key/value attributes of the metric
    */
     public MetricName metricName(String name, String group, String description, Map<String, String> tags);

    /**
    * Create a MetricName with the given name, group, description, and default tags
    * specified in the metric configuration.
    *
    * @param name The name of the metric
    * @param group logical group name of the metrics to which this metric belongs
    * @param description A human-readable description to include in the metric
    */
     public MetricName metricName(String name, String group, String description);

    /**
    * Create a MetricName with the given name, group and default tags specified in the metric configuration.
    *
    * @param name The name of the metric
    * @param group logical group name of the metrics to which this metric belongs
    */
     public MetricName metricName(String name, String group);

    /**
    * Create a MetricName with the given name, group, description, and keyValue as tags, plus default tags specified in the metric
    * configuration. Tag in keyValue takes precedence if the same tag key is specified in the default metric configuration.
    *
    * @param name The name of the metric
    * @param group logical group name of the metrics to which this metric belongs
    * @param description A human-readable description to include in the metric
    * @param keyValue additional key/value attributes of the metric (must come in pairs)
    */
     public MetricName metricName(String name, String group, String description, String... keyValue);

    /**
    * Create a MetricName with the given name, group and tags, plus default tags specified in the metric
    * configuration. Tag in tags takes precedence if the same tag key is specified in the default metric configuration.
    *
    * @param name The name of the metric
    * @param group logical group name of the metrics to which this metric belongs
    * @param tags key/value attributes of the metric
    */
     public MetricName metricName(String name, String group, Map<String, String> tags);


    public MetricConfig config();

    /**
    * Get the sensor with the given name if it exists
    * @param name The name of the sensor
    * @return Return the sensor or null if no such sensor exists
    */
     public Sensor getSensor(String name);

    /**
    * Get or create a sensor with the given unique name and no parent sensors.
    * @param name The sensor name
    * @return The sensor
    */
     public Sensor sensor(String name);

    /**
    * Get or create a sensor with the given unique name and no parent sensors and with a given
    * recording level.
    * @param name The sensor name.
    * @param recordLevel The recording level.
    * @return The sensor
    */
     public Sensor sensor(String name, Sensor.RecordLevel recordLevel);


    /**
    * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor
    * @param parents The parent sensors
    * @return The sensor that is created
    */
     public Sensor sensor(String name, Sensor... parents);

    /**
    * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor.
    * @param parents The parent sensors.
    * @param recordLevel The recording level.
    * @return The sensor that is created
    */
     public Sensor sensor(String name, Sensor.RecordLevel recordLevel, Sensor... parents);

     /**
     * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor
    * @param config A default configuration to use for this sensor for metrics that don't have their own config
    * @param parents The parent sensors
    * @return The sensor that is created
    */
     public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents);

    /**
    * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor.
    * @param parents The parent sensors.
    * @param recordLevel The recording level.
    * @return The sensor that is created */
     public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordLevel recordLevel, Sensor... parents);

    /**
    * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor
    * @param config A default configuration to use for this sensor for metrics that don't have their own config
    * @param recordLevel The recording level.
    * @param parents The parent sensors
    * @return The sensor that is created
    */
    public synchronized Sensor sensor(String name, MetricConfig config, Sensor.RecordLevel recordLevel, Sensor... parents);

     

    /**
    * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
    * receive every value recorded with this sensor.
    * @param name The name of the sensor
    * @param config A default configuration to use for this sensor for metrics that don't have their own config
    * @param inactiveSensorExpirationTimeSeconds If no value if recorded on the Sensor for this duration of time,
    * it is eligible for removal
    * @param parents The parent sensors
    * @param recordLevel The recording level.
    * @return The sensor that is created
    */
    public synchronized Sensor sensor(String name, MetricConfig config, long inactiveSensorExpirationTimeSeconds, Sensor.RecordLevel recordLevel, Sensor... parents);


    /**
    * Remove a sensor (if it exists), associated metrics and its children.
    *
    * @param name The name of the sensor to be removed
    */
     public void removeSensor(String name);

    /**
    * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
    * This is a way to expose existing values as metrics.
    * @param metricName The name of the metric
    * @param measurable The measurable that will be measured by this metric
    */
     public void addMetric(MetricName metricName, Measurable measurable);

    /**
    * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
    * This is a way to expose existing values as metrics.
    * @param metricName The name of the metric
    * @param config The configuration to use when measuring this measurable
    * @param measurable The measurable that will be measured by this metric
    */
     public synchronized void addMetric(MetricName metricName, MetricConfig config, Measurable measurable);

    /**
    * Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval`
    * will be invoked for each reporter.
    *
    * @param metricName The name of the metric
    * @return the removed `KafkaMetric` or null if no such metric exists
    */
     public synchronized KafkaMetric removeMetric(MetricName metricName);
    /**
    * Add a MetricReporter
    */
     public synchronized void addReporter(MetricsReporter reporter);

    /**
    * Get all the metrics currently maintained indexed by metricName
    */
     public Map<MetricName, KafkaMetric> metrics();
     
    public KafkaMetric metric(MetricName metricName);
     /**
    * Close this metrics repository.
    */
     @Override
     public void close();

    }

Proposed Changes

  • Enumeration of Sensors: This KIP proposes the introduction of the following sensors

    • Node punctuate time sensor: This sensor is associated with latency metrics depicting the average and max latency in the punctuate time of a node.

    • Node creation time sensor: This sensor is associated with latency metrics depicting the average and max latency in the creation time of a node.

    • Node destruction time sensor:  This sensor is associated with latency metrics depicting the average and max latency in the destruction time of a node.

    • Node process time sensor: This sensor is associated with latency metrics depicting the average and max latency in the process time of a node.

    • Node throughput sensor: This sensor is associated with throughput metrics depicting the context forwarding rate of metrics through a node, i.e., indicating how many records were forwarded downstream from this processor node.

    • Skipped records sensor in StreamTask:This sensor is associated with a count metric, which helps monitor if streams are well synchronized. The metric measures the difference in the total record count and the number of added records between the last record time. This is useful during debugging as this count should not be off by too much during normal operations.

  • Addition of new sensors

    • Users can use the provided helped functions addLatencySensor and addThroughputSensor or register metrics directly with the exposed underlying metrics registry obtained through registry().

Compatibility, Deprecation, and Migration Plan

  • none

Rejected Alternatives

  • none
  • No labels