Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Current stateUnder DiscussionAccepted

Discussion thread: TBD link


serverASF JIRA
jqlQuerykey = KAFKA-6820 or key = KAFKA-6819 or key = KAFKA-7963 or key = KAFKA-8856 or key = KAFKA-9230 or key = KAFKA-5676 9753


For the above purposes, we want to 1) cleanup Streams Built-in Metrics to have more out-of-the-box useful metrics while trimming those non-useful ones because the current APIs are not very intuitive from its naming to reason about its semantics (this proposal includes removing some redundant APIs as well as refactoring the parent-child metrics relationships, details below), and 2) improve APIs for User Customized Metrics that let users register them own metrics, based on its "operationName / scopeName / entityName" notions; we would simplify this interface for user's needs, plus making sure it functions correctly.


Code Block
// deprecated APIs: use {@link Sensor#record(double)} directly instead.

void recordLatency(final Sensor sensor, final long startNs, final long endNs);

void recordThroughput(final Sensor sensor, void final long value);

// updated APIs javadocs

   * Add a latency and throughput sensor for a specific operation, which will include the following metrics:
   * <ol>
   * <li>average latency</li>@Deprecated 
* @deprecated since 2.5. Use {@link addLatencyRateTotalSensor} instead
Sensor addLatencyAndThroughputSensor(... )

* @deprecated since 2.5. Use {@link addRateTotalSensor} instead
Sensor addThroughputSensor(... )

// updated APIs javadocs

   * <li>max latency</li>
   * <li>invocation rate (num.operations / time unit)</li>
   * <li>total invocation count</li>Add a latency, rate and total sensor for a specific operation, which will include the following metrics:
   * </ol><ol>
   * Whenever a user record this sensor via {@link Sensor#record(double)} etc,<li>average latency</li>
   * <li>max latency</li>
   * it<li>invocation will be counted as one invocation of the operation, and hence the rate / count metrics will be updated accordingly;rate (num.operations / time unit)</li>
   * <li>total invocation count</li>
   * </ol>
   * andWhenever thea recordeduser latencyrecord valuethis willsensor bevia used to update the average / max latency as well. The time unit{@link Sensor#record(double)} etc,
   * it will be counted as one invocation of the latencyoperation, canand behence defined
the rate / *count bymetrics thewill user.
be updated  *accordingly;
   * Noteand thatthe yourecorded latency canvalue addwill morebe metricsused to this sensor after created it, which can then be updated upon {@link Sensor#record(double)} calls; update the average / max latency as well. The time unit of the latency can be defined
   * butby additional user-customized metrics will not be managed by {@link StreamsMetrics}.
the user.
   * @paramNote scopeNamethat you can add more metrics to this sensor after name of the scopecreated it, which willcan then be usedupdated asupon part of the metrics type, e.g.: "stream-[scope]-metrics".{@link Sensor#record(double)} calls;
   * but additional user-customized metrics will not be managed by {@link StreamsMetrics}.
   * @param scopeName entityName         name of the entityscope, which will be used as part of the metricmetrics tagstype, e.g.: "stream-[scope]-id" = "[entity]"metrics".
   * @param operationNameentityName         name of the operationentity, which will be used as the namepart of the metric tags, e.g.: "[operationscope]-latency-avgid" = "[entity]".
   * @param operationName recordingLevel     thename recordingof level (e.g.the operation, INFOwhich orwill DEBUG)be forused thisas sensor.
the name of * @param tagsthe metric, e.g.: "[operation]-latency-avg".
   * @param recordingLevel     the recording level (e.g., INFO additionalor tagsDEBUG) offor thethis sensor.
   * @return@param tags  The added             additional tags of the sensor
   * @return The added sensor.
  Sensor addLatencyAndThroughputSensoraddLatencyRateTotalSensor(final String scopeName,
                                       final String entityName,
                                       final String operationName,
                                       final Sensor.RecordingLevel recordingLevel,
                                       final String... tags);

Users can create a sensor via either `addLatencyAndThroughputSensor` or `addThroughputSensor`, which will be pre-registered with the latency / throughput metrics already; more metrics can then be added to the returned sensors in addition to the pre-registered ones. When recording a value to the sensor, users should just use `Sensor#record()` directly on the sensor itself.

Streams build-in Metrics

And for Streams built-in metrics, we will clean them up by 1) adding a few instance-level metrics, 2) removing a few non-useful / overlapped-in-function metrics, 3) changing some metrics' recording level as well. Note the symbols tags in the tables below (the descriptions of the metrics are omitted since their semantics are all straight-forward based on the names of "rate, total, max, avg, static gauge" etc).

$: newly added

! : breaking changes

* : the sensors are created lazily

(→) : parent sensor








(! tag name changed)


(! tag name changed)


(! tag name changed)


(! tag name changed)


(! tag name changed)


version | commit-id (static gauge)


application-id (static gauge)


topology-description (static gauge)


state (dynamic gauge)


rebalance-latency (avg | max)


rebalance (rate | total)


last-rebalance-time (dynamic gauge)


active-task-process (ratio)


standby-task-process (ratio)


process-latency (avg | max)


process (rate | total)


punctuate-latency (avg | max)


punctuate (rate | total)


commit-latency (avg | max)


commit (rate | total)


poll-latency (avg | max)


poll (rate | total)


task-created | closed (rate | total)


enforced-processing (rate | total)


record-lateness (avg | max)


dropped-late-records (rate | total)


INFO * (window processor only)

             (! name changed)

   * Add a rate and a total sensor for a specific operation, which will include the following metrics:
   * <ol>
   * <li>invocation rate (num.operations / time unit)</li>
   * <li>total invocation count</li>
   * </ol>
   * Whenever a user record this sensor via {@link Sensor#record(double)} etc,
   * it will be counted as one invocation of the operation, and hence the rate / count metrics will be updated accordingly.
   * Note that you can add more metrics to this sensor after created it, which can then be updated upon {@link Sensor#record(double)} calls;
   * but additional user-customized metrics will not be managed by {@link StreamsMetrics}.
   * @param scopeName          name of the scope, which will be used as part of the metrics type, e.g.: "stream-[scope]-metrics".
   * @param entityName         name of the entity, which will be used as part of the metric tags, e.g.: "[scope]-id" = "[entity]".
   * @param operationName      name of the operation, which will be used as the name of the metric, e.g.: "[operation]-latency-avg".
   * @param recordingLevel     the recording level (e.g., INFO or DEBUG) for this sensor.
   * @param tags               additional tags of the sensor
   * @return The added sensor.
  Sensor addRateTotalSensor(final String scopeName,
                            final String entityName,
                            final String operationName,
                            final Sensor.RecordingLevel recordingLevel,
                            final String... tags);

Users can create a sensor via either `addLatencyAndRateSensor` or `addRateSensor`, which will be pre-registered with the latency / rate metrics already; more metrics can then be added to the returned sensors in addition to the pre-registered ones. When recording a value to the sensor, users should just use `Sensor#record()` directly on the sensor itself.

Streams build-in Metrics

And for Streams built-in metrics, we will clean them up by 1) adding a few instance-level metrics, 2) removing a few non-useful / overlapped-in-function metrics, 3) changing some metrics' recording level as well. Note the symbols tags in the tables below (the descriptions of the metrics are omitted since their semantics are all straight-forward based on the names of "rate, total, max, avg, static gauge" etc).

$: newly added

! : breaking changes

* : the sensors are created lazily

(→) : parent sensor





Per-Processor-Node Per-State-StorePer-Cache



(! tag name changed)


(! tag name changed)


(! tag name changed)


(! tag name changed)


(! tag name changed)

version | commit-id (static gauge)
INFO ($)

application-id (static gauge)
INFO ($)

topology-description (static gauge)
INFO ($)

state (dynamic gauge)
INFO ($)

alive-stream-threads (dynamic gauge)
INFO ($)

process-latency (avg | max)

INFODEBUG(! removed for now)

process (rate | total)

INFODEBUG ( → ) on source-nodes onlyDEBUG on source-nodes only

punctuate-latency (avg | max)


punctuate (rate | total)


commit-latency (avg | max)


commit (rate | total)


poll-latency (avg | max)


poll (rate | total)


process | punctuate | commit | poll-ratio (dynamic gauge)


task-created | closed (rate | total)


poll-records (avg | max)


process-records (avg | max)


active-process-ratio (dynamic gauge)

INFO ($) (percentage of time the hosting thread is spending with this active task)

standby-process-ratio (dynamic gauge)

INFO ($) (percentage of time the hosting thread is spending with this standby task)

dropped-records (rate | total)

INFO ($) (number of records dropped within this task due to all kinds of scenarios)

active-buffer-count (dynamic gauge)


enforced-processing (rate | total)


record-lateness (avg | max)


suppression-emit (rate | total)

DEBUG * (suppress processor only)

suppression-buffer-size (avg | max)

DEBUG * (suppression buffer only)
suppression-buffer-count (avg | max)

DEBUG * (suppression buffer only)
(put | put-if-absent .. | get)-latency (avg | max)

DEBUG * (excluding suppression buffer)

                 (! name changed)

(put | put-if-absent .. | get) (rate)

DEBUG * (excluding suppression buffer)

                 (! name changed)

hit-ratio (avg | min | max)

DEBUG  (! name changed)

A few philosophies behind this cleanup:

  1. We will remove most of the parent sensors with `level-tag=all` except one case.  The main idea is to let users to do rolling-ups themselves only if necessary so that we can save necessary metrics value aggregations. For these exceptional case, one parent-child sensor relationship is maintained because it is a bit tricky for users to do the rolling up correctly.
  2. We will keep all LEVEL-0 (instance) and LEVEL-1 (thread) sensors as INFO, and most of lower level sensors as DEBUG reporting level. They only exception is active/standby-task-process and dropped / skipp-records
    1. active/standby-task-process indicate the percentage that the current hosting thread is spending on processing them.
    2. dropped/skipped records indicate unexpected errors during processing and hence need to be paid attention by users. Their semantics though are a bit different: skipped records are those skipped at the very beginning of the process and hence not even traverse the topology at all; dropped-records are those dropped in the middle of the topology, and are not necessarily corresponding to a 1-1 mapping to the source records since one source records may be transformed to multiple intermediate records which are then dropped later.
  3. For some metrics that are only useful for a specific type of entities, like "suppression-emit", we will only create the sensors lazily in order to save unnecessary costs for metrics reporters to iterate those empty sensors.
  4. Some of the lower level metrics like "forward-rate" and "destroy-rate" are removed directly since they are overlapping with other existing metrics already. Here are a list of removed / replaced sensors:

Code Block
late-records-drop: INFO at processor node level, replaced by INFO task-level "dropped-records".

skipped-records: INFO at thread and processor node level, replaced by INFO task-level "dropped-records".

expired-window-record-drop: DEBUG at state store level, replaced by INFO task-level "dropped-records".

forward-rate: DEBUG at processor-node level, replaced by DEBUG processor node level "process-rate".

destroy-rate: DEBUG at processor-node level, covered by INFO thread-level "task-closed-rate".

create-rate: DEBUG at processor-node level, covered by INFO thread-level "task-create-rate".

Release History


  • client-level metrics:
    • added:
      • version 
      • commit-id
      • application-id
      • topology-description
      • state


  • thread-level metrics:
    • refactored:
      • process-latency (avg | max)
      • process (rate | total) 
      • punctuate-latency (avg | max)
      • punctuate (rate | total)
      • commit-latency (avg | max)
      • commit (rate | total)
      • poll-latency (avg | max)
      • poll (rate | total)
      • task-created | closed (rate | total)
    • removed:
      • skipped-records
  • task-level metrics:
    • refactored:
      • process-latency (avg | max)
      • process (rate | total) 
      • punctuate-latency (avg | max)
      • punctuate (rate | total)
      • commit-latency (avg | max)
      • commit (rate | total)
      • enforced-processing (rate | total)
      • record-lateness (avg | max)
    • added:
      • dropped-records (rate | total)
    • removed:
      • expired-window-record-drop
  • processor-node-level:
    • refactored:
      • process (rate | total)
      • suppression-emit (rate | total)
    • removed:
      • process-latency (avg | max)
      • late-records-drop
      • skipped-records
      • forward-rate
      • destroy-rate
      • create-rate
  • state-store-level:
    • refactored:
      • suppression-buffer-size (avg | max)
      • suppression-buffer-count (avg | max)
      • (put | put-if-absent .. | get)-latency (avg | max)
      • (put | put-if-absent .. | get) (rate)
    • removed:
      • expired-window-record-drop
  • cache-level:
    • refactored:
      • hit-ratio (avg | min | max)


  • client-level metrics:
    • added: 
      • alive-stream-threads
  • thread-level metrics:
    • added: 
      • process | punctuate | commit | poll-ratio
      • poll-records (avg | max)
      • process-records (avg | max)
  • task-level metrics:
    • added: 
      • active-process-ratio


suppression-emit (rate | total)


skipped-records (rate | total)


INFO * (few processors + record queue only)


suppression-buffer-size (avg | max)


suppression-buffer-count (avg | max)


expired-window-record-drop (rate | total)


put | put-if-absent .. | get-latency (avg | max)


DEBUG * (excluding suppression buffer)

                 (! name changed)


put | put-if-absent .. | get (rate)


DEBUG * (excluding suppression buffer)

                 (! name changed)


hit-ratio (avg | min | max)


A few philosophies behind this cleanup:


Proposed Changes

As above.


Code Block

type: Enum 
values: {"0.10.0-2.2-4", "2.3latest"}
default: "2.3latest"

When users override it to "0.10.0-2.2-4", then the old metrics names / tags will still be used.
