StreamsMetrics
Table of Contents |
---|
Status
Current state: Accepted
...
JIRA:
Jira | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
...
Code Block | ||
---|---|---|
| ||
// deprecated APIs: use {@link Sensor#record(double)} directly instead. @Deprecated void recordLatency(final Sensor sensor, final long startNs, final long endNs); @Deprecated void recordThroughput(final Sensor sensor, void final long value); // updated APIs javadocs /* * Add a latency, rate and total sensor for a @Deprecated * @deprecated since 2.5. Use {@link addLatencyRateTotalSensor} instead Sensor addLatencyAndThroughputSensor(... ) @Deprecated * @deprecated since 2.5. Use {@link addRateTotalSensor} instead Sensor addThroughputSensor(... ) // updated APIs javadocs /* * Add a latency, rate and total sensor for a specific operation, which will include the following metrics: * <ol> * <li>average latency</li> * <li>max latency</li> * <li>invocation rate (num.operations / time unit)</li> * <li>total invocation count</li> * </ol> * Whenever a user record this sensor via {@link Sensor#record(double)} etc, * it will be counted as one invocation of the operation, and hence the rate / count metrics will be updated accordingly; * and the recorded latency value will be used to update the average / max latency as well. The time unit of the latency can be defined * by the user. * * Note that you can add more metrics to this sensor after created it, which can then be updated upon {@link Sensor#record(double)} calls; * but additional user-customized metrics will not be managed by {@link StreamsMetrics}. * * @param scopeName name of the scope, which will be used as part of the metrics type, e.g.: "stream-[scope]-metrics". * @param entityName name of the entity, which will be used as part of the metric tags, e.g.: "[scope]-id" = "[entity]". * @param operationName name of the operation, which will be used as the name of the metric, e.g.: "[operation]-latency-avg". * @param recordingLevel the recording level (e.g., INFO or DEBUG) for this sensor. * @param tags additional tags of the sensor * @return The added sensor. */ Sensor addLatencyRateTotalSensor(final String scopeName, final String entityName, final String operationName, final Sensor.RecordingLevel recordingLevel, final String... tags); /* * Add a rate and a total sensor for a specific operation, which will include the following metrics: * <ol> * <li>invocation rate (num.operations / time unit)</li> * <li>total invocation count</li> * </ol> * Whenever a user record this sensor via {@link Sensor#record(double)} etc, * it will be counted as one invocation of the operation, and hence the rate / count metrics will be updated accordingly. * * Note that you can add more metrics to this sensor after created it, which can then be updated upon {@link Sensor#record(double)} calls; * but additional user-customized metrics will not be managed by {@link StreamsMetrics}. * * @param scopeName name of the scope, which will be used as part of the metrics type, e.g.: "stream-[scope]-metrics". * @param entityName name of the entity, which will be used as part of the metric tags, e.g.: "[scope]-id" = "[entity]". * @param operationName name of the operation, which will be used as the name of the metric, e.g.: "[operation]-latency-avg". * @param recordingLevel the recording level (e.g., INFO or DEBUG) for this sensor. * @param tags additional tags of the sensor * @return The added sensor. */ Sensor addRateTotalSensor(final String scopeName, final String entityName, final String operationName, final Sensor.RecordingLevel recordingLevel, final String... tags); |
...
LEVEL 0 | LEVEL 1 | LEVEL 2 | LEVEL 3 | LEVEL 3 | LEVEL 3 | |
Per-Client | Per-Thread | Per-Task | Per-Processor-Node | Per-State-Store | Per-Cache | |
---|---|---|---|---|---|---|
TAGS | type=stream-metrics,client-id=[client-id] | type=stream-thread-metrics,thread-id=[threadId] (! tag name changed) | type=stream-task-metrics,thread-id=[threadId],task-id=[taskId] (! tag name changed) | type=stream-processor-node-metrics,thread-id=[threadId],task-id=[taskId],processor-node-id=[processorNodeId] (! tag name changed) | stream-state-metrics,thread-id=[threadId],threadtask-nameid=[taskId],[storeType]-state-id=[storeName] (! tag name changed) | type=stream-record-cache-metrics,thread-id=[threadId],task-id=[taskId],record-cache-id=[storeName] (! tag name changed) |
version | commit-id (static gauge) | INFO ($) | |||||
application-id (static gauge) | INFO ($) | |||||
topology-description (static gauge) | INFO ($) | |||||
state (dynamic gauge) | INFO ($) | |||||
alive-stream-threads (dynamic gauge) | INFO ($) | |||||
process-latency (avg | max) | INFO | DEBUG | (! removed for now) | |||
process (rate | total) | INFO | DEBUG ( → ) on source-nodes onlyDEBUG | DEBUG on source-nodes only | |||
punctuate-latency (avg | max) | INFO | DEBUG | ||||
punctuate (rate | total) | INFO | DEBUG | ||||
commit-latency (avg | max) | INFO | DEBUG | ||||
commit (rate | total) | INFO | DEBUG | ||||
poll-latency (avg | max) | INFO | |||||
poll (rate | total) | INFO | |||||
process | punctuate | commit | poll-ratio (dynamic gauge) | INFO | |||||
task-created | closed (rate | total) | INFO | |||||
activepoll-process-ratiorecords (dynamicavg | gaugemax) | INFO | |||||
process-records (avg | max) | INFO | |||||
active-process-ratio (dynamic gauge) | INFO ($) (percentage of time the hosting thread is spending with this active task) | |||||
standby-process-ratio (dynamic gauge) | INFO ($) (percentage of time the hosting thread is spending with this standby task) | |||||
dropped-records (rate | total) | INFO ($) (number of records dropped within this task due to all kinds of scenarios) | |||||
active-buffer-count (dynamic gauge) | DEBUG | |||||
enforced-processing (rate | total) | DEBUG | |||||
record-lateness (avg | max) | DEBUG | |||||
suppression-emit (rate | total) | DEBUG * (suppress processor only) | |||||
suppression-buffer-size (avg | max) | DEBUG * (suppression buffer only) | |||||
suppression-buffer-count (avg | max) | DEBUG * (suppression buffer only) | |||||
(put | put-if-absent .. | get)-latency (avg | max) | DEBUG * (excluding suppression buffer) (! name changed) | |||||
(put | put-if-absent .. | get) (rate) | DEBUG * (excluding suppression buffer) (! name changed) | |||||
hit-ratio (avg | min | max) | DEBUG (! name changed) |
...
- We will remove most of the parent sensors with `level-tag=all` except two casesone case. The main idea is to let users to do rolling-ups themselves only if necessary so that we can save necessary metrics value aggregations. For those two these exceptional casescase, two one parent-child sensor relationship is maintained because it is a bit tricky for users to do the rolling up correctly.
- We will keep all LEVEL-0 (instance) and LEVEL-1 (thread) sensors as INFO, and most of lower level sensors as DEBUG reporting level. They only exception is active/standby-task-process and dropped / skipp-records
- active/standby-task-process indicate the percentage that the current hosting thread is spending on processing them.
- dropped/skipped records indicate unexpected errors during processing and hence need to be paid attention by users. Their semantics though are a bit different: skipped records are those skipped at the very beginning of the process and hence not even traverse the topology at all; dropped-records are those dropped in the middle of the topology, and are not necessarily corresponding to a 1-1 mapping to the source records since one source records may be transformed to multiple intermediate records which are then dropped later.
- For some metrics that are only useful for a specific type of entities, like "suppression-emit", we will only create the sensors lazily in order to save unnecessary costs for metrics reporters to iterate those empty sensors.
- Some of the lower level metrics like "forward-rate" and "destroy-rate" are removed directly since they are overlapping with other existing metrics already. Here are a list of removed / replaced sensors:
Code Block |
---|
late-records-drop: INFO at processor node level, replaced by INFO task-level "dropped-records". skipped-records: INFO at thread and processor node level, replaced by INFO task-level "dropped-records". expired-window-record-drop: DEBUG at state store level, replaced by INFO task-level "dropped-records". forward-rate: DEBUG at processor-node level, replaced by DEBUG processor node level "process-rate". destorydestroy-rate: DEBUG at taskprocessor-node level, covered by INFO thread-level "task-closed-rate". create-rate: DEBUG at taskprocessor-node level, covered by INFO thread-level "task-create-rate". |
Release History
2.4
- client-level metrics:
- added:
version
commit-id
application-id
topology-description
state
- added:
2.5
- thread-level metrics:
- refactored:
process-latency (avg | max)
process (rate | total)
punctuate-latency (avg | max)
punctuate (rate | total)
commit-latency (avg | max)
commit (rate | total)
poll-latency (avg | max)
poll (rate | total)
task-created | closed (rate | total)
- removed:
skipped-records
- refactored:
- task-level metrics:
- refactored:
process-latency (avg | max)
process (rate | total)
punctuate-latency (avg | max)
punctuate (rate | total)
commit-latency (avg | max)
commit (rate | total)
enforced-processing (rate | total)
record-lateness (avg | max)
- added:
dropped-records (rate | total)
- removed:
expired-window-record-drop
- refactored:
- processor-node-level:
- refactored:
process (rate | total)
suppression-emit (rate | total)
- removed:
process-latency (avg | max)
late-records-drop
skipped-records
forward-rate
destroy-rate
create-rate
- refactored:
- state-store-level:
- refactored:
suppression-buffer-size (avg | max)
suppression-buffer-count (avg | max)
(put | put-if-absent .. | get)-latency (avg | max)
(put | put-if-absent .. | get) (rate)
- removed:
expired-window-record-drop
- refactored:
- cache-level:
- refactored:
hit-ratio (avg | min | max)
- refactored:
2.6
- client-level metrics:
- added:
alive-stream-threads
- added:
- thread-level metrics:
- added:
process | punctuate | commit | poll-ratio
poll-records (avg | max)
process-records (avg | max)
- added:
- task-level metrics:
- added:
active-process-ratio
- added:
Proposed Proposed Changes
As above.
Compatibility, Deprecation, and Migration Plan
...
Code Block |
---|
"built.in.metrics.version": type: Enum values: {"0.10.0", "0.10.1", ... "-2.34", "2.4latest"} default: "2.4latest" |
When users override it to "0.10.0-2.34" or below, then the old metrics names / tags will still be used.
...