Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • A StreamsMetrics class with the following methods:
     
    @InterfaceStability.Unstable
    public interface StreamsMetrics {
    /**
    * Get read-only handle on global metrics registry
    * @return Map of all metrics.
    */
    Map<MetricName, ? extends Metric> metrics();

     /**
    * Add a latency sensor. This is equivalent to adding a sensor with metrics on latency and rate.
    *
    * @param scopeName Name of the scope, could be the type of the state store, etc.
    * @param entityName Name of the entity, could be the name of the state store instance, etc.
    * @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
    * @param operationName Name of the operation, could be get / put / delete / etc.
    * @param tags Additional tags of the sensor.
    * @return The added sensor.
    */
     Sensor addLatencySensoraddLatencyAndThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordLevel recordLevel, String... tags);

     /**
    * Record the given latency value of the sensor.
    * @param sensor sensor whose latency we are recording.
    * @param startNs start of measurement time in nanoseconds.
    * @param endNs end of measurement time in nanoseconds.
    */
     void recordLatency(Sensor sensor, long startNs, long endNs);

     /**
    * Add a throughput sensor. This is equivalent to adding a sensor with metrics rate.
    *
    * @param scopeName Name of the scope, could be the type of the state store, etc.
    * @param entityName Name of the entity, could be the name of the state store instance, etc.
    * @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
    * @param operationName Name of the operation, could be get / put / delete / etc.
    * @param tags Additional tags of the sensor.
    * @return The added sensor.
    */
     Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordLevel recordLevel, String... tags);

     /**
    * Records the throughput value of a sensor.
    * @param sensor sensor whose throughput we are recording.
    * @param value throughput value.
    */
     void recordThroughput(Sensor sensor, long value);
     
    /**
    * Generic sensor creation. Note that for most cases it is advisable to use {@link #addThroughputSensor(String, String, String, Sensor.RecordLevel, String...)}
    * or {@link #addLatencySensor(String, String, String, Sensor.RecordLevel, String...)} to ensure metric name well-formedness and conformity with the rest
    * of the streams code base.
    * @param name Name of the sensor.
    * @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
    */
    Sensor addSensor(String name, Sensor.RecordLevel recordLevel);

    /**
    * Same as previous constructor {@link #addSensor(String, Sensor.RecordLevel, Sensor...)} sensor}, but takes a set of parents as well.
    *
    */
    Sensor addSensor(String name, Sensor.RecordLevel recordLevel, Sensor... parents);
    /**
    * Remove a sensor with the given name.
    * @param sensor Sensor to be removed.
    */
    void removeSensor(Sensor sensor);

    }

  • Furthermore, the KafkaStreams class can also expose all the metrics read-only. So we add the same method we added to the StreamsMetrics interface. 
    /**
    * Get read-only handle on global metrics registry
    * @return Map of all metrics.
    */
    Map<MetricName, ? extends Metric> metrics();

...