Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: addressed comment "list the added overloaded APIs for throughput sensors as well as with the recordLevel parameters, and mention what will be the default values for those added parameters in the existing API functions"

...

  • Enumeration of Sensors: This KIP proposes the introduction of the following sensors

    • Node punctuate time sensor: This sensor is associated with latency metrics depicting the average and max latency in the punctuate time of a node.

    • Node creation time sensor: This sensor is associated with latency metrics depicting the average and max latency in the creation time of a node.

    • Node destruction time sensor:  This sensor is associated with latency metrics depicting the average and max latency in the destruction time of a node.

    • Node process time sensor: This sensor is associated with latency metrics depicting the average and max latency in the process time of a node.

    • Node throughput sensor: This sensor is associated with throughput metrics depicting the context forwarding rate of metrics through a node, i.e., indicating how many records were forwarded downstream from this processor node.

    • Skipped records sensor in StreamTask:This sensor is associated with a count metric, which helps monitor if streams are well synchronized. The metric measures the difference in the total record count and the number of added records between the last record time. This is useful during debugging as this count should not be off by too much during normal operations.

    • Addition of throughput sensors, (either by calling the method addThroughputSensor or the method sensor), now takes a Sensor.RecordLevel as follows
      /**
      * Add a throughput sensor. This is equivalent to adding a sensor with metrics rate.
      *
      * @param scopeName Name of the scope, could be the type of the state store, etc.
      * @param entityName Name of the entity, could be the name of the state store instance, etc.
      * @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
      * @param operationName Name of the operation, could be get / put / delete / etc.
      * @param tags Additional tags of the sensor.
      * @return The added sensor.
      */
      Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordLevel recordLevel, String... tags);


      /**
      * Generic sensor creation. Note that for most cases it is advisable to use {@link #addThroughputSensor(String, String, String, Sensor.RecordLevel, String...)}
      * or {@link #addLatencySensor(String, String, String, Sensor.RecordLevel, String...)} to ensure metric name well-formedness and conformity with the rest
      * of the streams code base.
      * @param scopeName Name of the scope, could be the type of the state store, etc.
      * @param entityName Name of the entity, could be the name of the state store instance, etc.
      * @param recordLevel The recording level (e.g., INFO or DEBUG) for this sensor.
      * @param operationName Name of the operation, could be get / put / delete / etc.
      */
      Sensor sensor(String scopeName, String entityName, String operationName, Sensor.RecordLevel recordLevel);

      At the processor node level, throughput sensors are registered using RecordLevel of DEBUG, i.e,

      metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "process-throughput", Sensor.RecordLevel.DEBUG, tagKey, tagValue);

Compatibility, Deprecation, and Migration Plan

...