Current state: Accepted
Discussion thread: here
JIRA: KAFKA-3715: Higher granularity Streams metrics
Public Interfaces
@InterfaceStability.Unstable
public interface StreamsMetrics {
/**
* Get read-only handle on global metrics registry
* @return Map of all metrics.
*/
Map<MetricName, ? extends Metric> metrics();
/**
* Add a latency sensor. This is equivalent to adding a sensor with metrics on latency and rate.
*
* @param scopeName Name of the scope, could be the type of the state store, etc.
* @param entityName Name of the entity, could be the name of the state store instance, etc.
* @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
* @param operationName Name of the operation, could be get / put / delete / etc.
* @param tags Additional tags of the sensor.
* @return The added sensor.
*/
Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordLevel recordLevel, String... tags);
/**
* Record the given latency value of the sensor.
* @param sensor sensor whose latency we are recording.
* @param startNs start of measurement time in nanoseconds.
* @param endNs end of measurement time in nanoseconds.
*/
void recordLatency(Sensor sensor, long startNs, long endNs);
/**
* Add a throughput sensor. This is equivalent to adding a sensor with metrics rate.
*
* @param scopeName Name of the scope, could be the type of the state store, etc.
* @param entityName Name of the entity, could be the name of the state store instance, etc.
* @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
* @param operationName Name of the operation, could be get / put / delete / etc.
* @param tags Additional tags of the sensor.
* @return The added sensor.
*/
Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordLevel recordLevel, String... tags);
/**
* Records the throughput value of a sensor.
* @param sensor sensor whose throughput we are recording.
* @param value throughput value.
*/
void recordThroughput(Sensor sensor, long value);
/**
* Generic sensor creation. Note that for most cases it is advisable to use {@link #addThroughputSensor(String, String, String, Sensor.RecordLevel, String...)}
* or {@link #addLatencySensor(String, String, String, Sensor.RecordLevel, String...)} to ensure metric name well-formedness and conformity with the rest
* of the streams code base.
* @param name Name of the sensor.
* @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
*/
Sensor addSensor(String name, Sensor.RecordLevel recordLevel);
/**
* Same as previous constructor {@link #addSensor(String, Sensor.RecordLevel, Sensor...)} sensor}, but takes a set of parents as well.
*
*/
Sensor addSensor(String name, Sensor.RecordLevel recordLevel, Sensor... parents);
/**
* Remove a sensor with the given name.
* @param sensor Sensor to be removed.
*/
void removeSensor(Sensor sensor);
}
/**
* Get read-only handle on global metrics registry
* @return Map of all metrics.
*/
Map<MetricName, ? extends Metric> metrics();
Enumeration of Sensors: This KIP proposes the introduction of the following sensors
Node punctuate time sensor: This sensor is associated with latency metrics depicting the average and max latency in the punctuate time of a node.
Node creation time sensor: This sensor is associated with latency metrics depicting the average and max latency in the creation time of a node.
Node destruction time sensor: This sensor is associated with latency metrics depicting the average and max latency in the destruction time of a node.
Node process time sensor: This sensor is associated with latency metrics depicting the average and max latency in the process time of a node.
Node throughput sensor: This sensor is associated with throughput metrics depicting the context forwarding rate of metrics through a node, i.e., indicating how many records were forwarded downstream from this processor node.
Addition of new sensors
Users can use the provided helped functions addLatencySensor and addThroughputSensor to register metrics and removeSensor to remove sensors. Note that addLatencySensor already existed in the code base.