Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

StreamsMetrics

Table of Contents

Status

Current stateAccepted

...

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQuerykey = KAFKA-6820 or key = KAFKA-6819 or key = KAFKA-7963 or key = KAFKA-8856 or key = KAFKA-5676 9230 or key = KAFKA-8856 9753
serverId5aa69414-a9e9-3523-82ec-879b028fb15b

...

Code Block
languagejava
// deprecated APIs: use {@link Sensor#record(double)} directly instead.

@Deprecated
void recordLatency(final Sensor sensor, final long startNs, final long endNs);

@Deprecated
void recordThroughput(final Sensor sensor, void final long value);

// updated APIs javadocs

  /*
   * Add a latency, rate and total sensor for a @Deprecated 
* @deprecated since 2.5. Use {@link addLatencyRateTotalSensor} instead
Sensor addLatencyAndThroughputSensor(... )

@Deprecated 
* @deprecated since 2.5. Use {@link addRateTotalSensor} instead
Sensor addThroughputSensor(... )

// updated APIs javadocs

  /*
   * Add a latency, rate and total sensor for a specific operation, which will include the following metrics:
   * <ol>
   * <li>average latency</li>
   * <li>max latency</li>
   * <li>invocation rate (num.operations / time unit)</li>
   * <li>total invocation count</li>
   * </ol>
   * Whenever a user record this sensor via {@link Sensor#record(double)} etc,
   * it will be counted as one invocation of the operation, and hence the rate / count metrics will be updated accordingly;
   * and the recorded latency value will be used to update the average / max latency as well. The time unit of the latency can be defined
   * by the user.
   *
   * Note that you can add more metrics to this sensor after created it, which can then be updated upon {@link Sensor#record(double)} calls;
   * but additional user-customized metrics will not be managed by {@link StreamsMetrics}.
   *
   * @param scopeName          name of the scope, which will be used as part of the metrics type, e.g.: "stream-[scope]-metrics".
   * @param entityName         name of the entity, which will be used as part of the metric tags, e.g.: "[scope]-id" = "[entity]".
   * @param operationName      name of the operation, which will be used as the name of the metric, e.g.: "[operation]-latency-avg".
   * @param recordingLevel     the recording level (e.g., INFO or DEBUG) for this sensor.
   * @param tags               additional tags of the sensor
   * @return The added sensor.
   */
  Sensor addLatencyRateTotalSensor(final String scopeName,
                                   final String entityName,
                                   final String operationName,
                                   final Sensor.RecordingLevel recordingLevel,
                                   final String... tags);

  /*
   * Add a rate and a total sensor for a specific operation, which will include the following metrics:
   * <ol>
   * <li>invocation rate (num.operations / time unit)</li>
   * <li>total invocation count</li>
   * </ol>
   * Whenever a user record this sensor via {@link Sensor#record(double)} etc,
   * it will be counted as one invocation of the operation, and hence the rate / count metrics will be updated accordingly.
   *
   * Note that you can add more metrics to this sensor after created it, which can then be updated upon {@link Sensor#record(double)} calls;
   * but additional user-customized metrics will not be managed by {@link StreamsMetrics}.
   *
   * @param scopeName          name of the scope, which will be used as part of the metrics type, e.g.: "stream-[scope]-metrics".
   * @param entityName         name of the entity, which will be used as part of the metric tags, e.g.: "[scope]-id" = "[entity]".
   * @param operationName      name of the operation, which will be used as the name of the metric, e.g.: "[operation]-latency-avg".
   * @param recordingLevel     the recording level (e.g., INFO or DEBUG) for this sensor.
   * @param tags               additional tags of the sensor
   * @return The added sensor.
   */
  Sensor addRateTotalSensor(final String scopeName,
                            final String entityName,
                            final String operationName,
                            final Sensor.RecordingLevel recordingLevel,
                            final String... tags);

...


LEVEL 0LEVEL 1LEVEL 2LEVEL 3LEVEL 3LEVEL 3

Per-Client

Per-Thread

Per-Task 

Per-Processor-Node Per-State-StorePer-Cache
TAGS

type=stream-metrics,client-id=[client-id]

type=stream-thread-metrics,thread-id=[threadId]


(! tag name changed)

type=stream-task-metrics,thread-id=[threadId],task-id=[taskId]


(! tag name changed)

type=stream-processor-node-metrics,thread-id=[threadId],task-id=[taskId],processor-node-id=[processorNodeId]


(! tag name changed)

stream-state-metrics,thread-id=[threadId],threadtask-nameid=[taskId],[storeType]-state-id=[storeName]


(! tag name changed)

type=stream-record-cache-metrics,thread-id=[threadId],task-id=[taskId],record-cache-id=[storeName]


(! tag name changed)

version | commit-id (static gauge)
INFO ($)




application-id (static gauge)
INFO ($)




topology-description (static gauge)
INFO ($)




state (dynamic gauge)
INFO ($)




alive-stream-threads (dynamic gauge)
INFO ($)




process-latency (avg | max)

INFODEBUG(! removed for now)

process (rate | total)

INFODEBUG ( → ) on source-nodes onlyDEBUGDEBUG on source-nodes only

punctuate-latency (avg | max)

INFODEBUG


punctuate (rate | total)

INFODEBUG


commit-latency (avg | max)

INFODEBUG


commit (rate | total)

INFODEBUG


poll-latency (avg | max)

INFO



poll (rate | total)

INFO



process | punctuate | commit | poll-ratio (dynamic gauge)

INFO



task-created | closed (rate | total)

INFO



activepoll-process-ratiorecords (dynamicavg | gaugemax)

INFO



process-records (avg | max)

INFO



active-process-ratio (dynamic gauge)


INFO ($) (percentage of time the hosting thread is spending with this active task)


standby-process-ratio (dynamic gauge)


INFO ($) (percentage of time the hosting thread is spending with this standby task)


dropped-records (rate | total)


INFO ($) (number of records dropped within this task due to all kinds of scenarios)




active-buffer-count (dynamic gauge)


DEBUG


enforced-processing (rate | total)


DEBUG


record-lateness (avg | max)


DEBUG


suppression-emit (rate | total)



DEBUG * (suppress processor only)

suppression-buffer-size (avg | max)




DEBUG * (suppression buffer only)
suppression-buffer-count (avg | max)




DEBUG * (suppression buffer only)
(put | put-if-absent .. | get)-latency (avg | max)




DEBUG * (excluding suppression buffer)

                 (! name changed)


(put | put-if-absent .. | get) (rate)




DEBUG * (excluding suppression buffer)

                 (! name changed)


hit-ratio (avg | min | max)





DEBUG  (! name changed)

...

  1. We will remove most of the parent sensors with `level-tag=all` except two casesone case.  The main idea is to let users to do rolling-ups themselves only if necessary so that we can save necessary metrics value aggregations. For those two these exceptional casescase, two one parent-child sensor relationship is maintained because it is a bit tricky for users to do the rolling up correctly.
  2. We will keep all LEVEL-0 (instance) and LEVEL-1 (thread) sensors as INFO, and most of lower level sensors as DEBUG reporting level. They only exception is active/standby-task-process and dropped / skipp-records
    1. active/standby-task-process indicate the percentage that the current hosting thread is spending on processing them.
    2. dropped/skipped records indicate unexpected errors during processing and hence need to be paid attention by users. Their semantics though are a bit different: skipped records are those skipped at the very beginning of the process and hence not even traverse the topology at all; dropped-records are those dropped in the middle of the topology, and are not necessarily corresponding to a 1-1 mapping to the source records since one source records may be transformed to multiple intermediate records which are then dropped later.
  3. For some metrics that are only useful for a specific type of entities, like "suppression-emit", we will only create the sensors lazily in order to save unnecessary costs for metrics reporters to iterate those empty sensors.
  4. Some of the lower level metrics like "forward-rate" and "destroy-rate" are removed directly since they are overlapping with other existing metrics already. Here are a list of removed / replaced sensors:

Code Block
late-records-drop: INFO at processor node level, replaced by INFO task-level "dropped-records".

skipped-records: INFO at thread and processor node level, replaced by INFO task-level "dropped-records".

expired-window-record-drop: DEBUG at state store level, replaced by INFO task-level "dropped-records".

forward-rate: DEBUG at processor-node level, replaced by DEBUG processor node level "process-rate".

destorydestroy-rate: DEBUG at taskprocessor-node level, covered by INFO thread-level "task-closed-rate".

create-rate: DEBUG at taskprocessor-node level, covered by INFO thread-level "task-create-rate".


Release History

2.4

  • client-level metrics:
    • added:
      • version 
      • commit-id
      • application-id
      • topology-description
      • state

2.5

  • thread-level metrics:
    • refactored:
      • process-latency (avg | max)
      • process (rate | total) 
      • punctuate-latency (avg | max)
      • punctuate (rate | total)
      • commit-latency (avg | max)
      • commit (rate | total)
      • poll-latency (avg | max)
      • poll (rate | total)
      • task-created | closed (rate | total)
    • removed:
      • skipped-records
  • task-level metrics:
    • refactored:
      • process-latency (avg | max)
      • process (rate | total) 
      • punctuate-latency (avg | max)
      • punctuate (rate | total)
      • commit-latency (avg | max)
      • commit (rate | total)
      • enforced-processing (rate | total)
      • record-lateness (avg | max)
    • added:
      • dropped-records (rate | total)
    • removed:
      • expired-window-record-drop
  • processor-node-level:
    • refactored:
      • process (rate | total)
      • suppression-emit (rate | total)
    • removed:
      • process-latency (avg | max)
      • late-records-drop
      • skipped-records
      • forward-rate
      • destroy-rate
      • create-rate
  • state-store-level:
    • refactored:
      • suppression-buffer-size (avg | max)
      • suppression-buffer-count (avg | max)
      • (put | put-if-absent .. | get)-latency (avg | max)
      • (put | put-if-absent .. | get) (rate)
    • removed:
      • expired-window-record-drop
  • cache-level:
    • refactored:
      • hit-ratio (avg | min | max)

2.6

  • client-level metrics:
    • added: 
      • alive-stream-threads
  • thread-level metrics:
    • added: 
      • process | punctuate | commit | poll-ratio
      • poll-records (avg | max)
      • process-records (avg | max)
  • task-level metrics:
    • added: 
      • active-process-ratio

Proposed Proposed Changes

As above.

Compatibility, Deprecation, and Migration Plan

...

Code Block
"built.in.metrics.version":


type: Enum 
values: {"0.10.0", "0.10.1", ... "-2.34", "2.4latest"}
default: "2.4latest"


When users override it to "0.10.0-2.34" or below, then the old metrics names / tags will still be used.

...