Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread: TBD

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQuerykey = KAFKA-6820 or key = KAFKA-6819 or key = KAFKA-7963 or key = KAFKA-5676
serverId5aa69414-a9e9-3523-82ec-879b028fb15b

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

From collected community feedbacks on Streams operational experience, we are lacking several key metrics for the following tasks:

  • Monitoring: users would build UI consoles that demonstrate some key metrics 24-7. Only the most critical high-level health and status metrics would be consoled here (e.g. instance state, thread state). Alert triggers will usually be set on some threshold for these metrics (e.g. skip-record > 0, consume-latency > 10k, etc).
  • Information: this can be considered under the monitoring category as well but with different categories of metrics. Such information could include, for example, kafka version, application version (same appId may evolve over time), num.tasks hosted on instance, num.partitions subscribed on clients, etc. These are mostly static gauges that Users normally would not built console for them, but may commonly query these metrics values in operational tasks.
  • Debugging: when some issues were discovered, users would need to look at finer grained metrics. In other words, they are less frequently queried than the second categories.
  • Programmables: some time users would like to programmatically query the metrics, either inside their JVMs or as side-cars collocated with additional reporting logic on top of that.

For the above purposes, we want to 1) cleanup Streams Built-in Metrics to have more out-of-the-box useful metrics while trimming those non-useful ones, and 2) improve APIs for User Customized Metrics that let users register them own metrics, based on its "operationName / scopeName / entityName" notions; we would simplify this interface for user's needs, plus making sure it functions correctly.

Public Interfaces

StreamsMetrics Interface

First for user customizable metrics APIs, here's the proposed changes on `StreamsMetrics` interface:

Code Block
languagejava
// deprecated APIs: use {@link Sensor#record(double)} directly instead.

@Deprecated
void recordLatency(final Sensor sensor, final long startNs, final long endNs);

@Deprecated
void recordThroughput(final Sensor sensor, void final long value);

// updated APIs javadocs

  /*
   * Add a latency and throughput sensor for a specific operation, which will include the following metrics:
   * <ol>
   * <li>average latency</li>
   * <li>max latency</li>
   * <li>invocation rate (num.operations / time unit)</li>
   * <li>total invocation count</li>
   * </ol>
   * Whenever a user record this sensor via {@link Sensor#record(double)} etc,
   * it will be counted as one invocation of the operation, and hence the rate / count metrics will be updated accordingly;
   * and the recorded latency value will be used to update the average / max latency as well. The time unit of the latency can be defined
   * by the user.
   *
   * Note that you can add more metrics to this sensor after created it, which can then be updated upon {@link Sensor#record(double)} calls;
   * but additional user-customized metrics will not be managed by {@link StreamsMetrics}.
   *
   * @param scopeName          name of the scope, which will be used as part of the metrics type, e.g.: "stream-[scope]-metrics".
   * @param entityName         name of the entity, which will be used as part of the metric tags, e.g.: "[scope]-id" = "[entity]".
   * @param operationName      name of the operation, which will be used as the name of the metric, e.g.: "[operation]-latency-avg".
   * @param recordingLevel     the recording level (e.g., INFO or DEBUG) for this sensor.
   * @param tags               additional tags of the sensor
   * @return The added sensor.
   */
  Sensor addLatencyAndThroughputSensor(final String scopeName,
                                       final String entityName,
                                       final String operationName,
                                       final Sensor.RecordingLevel recordingLevel,
                                       final String... tags);


Users can create a sensor via either `addLatencyAndThroughputSensor` or `addThroughputSensor`, which will be pre-registered with the latency / throughput metrics already; more metrics can then be added to the returned sensors in addition to the pre-registered ones. When recording a value to the sensor, users should just use `Sensor#record()` directly on the sensor itself.

Streams build-in Metrics

And for Streams built-in metrics, we will clean them up by 1) adding a few instance-level metrics, 2) removing a few non-useful / overlapped-in-function metrics, 3) changing some metrics' recording level as well. Note the symbols tags in the tables below:


$: newly added

! : breaking changes

* : the sensors are created lazily

(→) : parent sensor



LEVEL 0LEVEL 1LEVEL 2LEVEL 3LEVEL 3LEVEL 3

Per-Client

Per-Thread

Per-Task 

Per-Processor-Node Per-State-StorePer-Cache
TAGS

type=stream-metrics,client-id=[client-id]

type=stream-thread-metrics,thread-name=[threadId]


(! tag name changed)

type=stream-task-metrics,thread-name=[threadId],task-id=[taskId]


(! tag name changed)

type=stream-processor-node-metrics,thread-name=[threadId],task-id=[taskId],processor-node-id=[processorNodeId]


(! tag name changed)

stream-state-metrics,client-id=[threadId],thread-name=[taskId],[storeType]-state-id=[storeName]


(! tag name changed)

type=stream-record-cache-metrics,thread-name=[threadId],task-id=[taskId],record-cache-id=[storeName]


(! tag name changed)

version | commit-id (static gauge)
INFO ($)




application-id (static gauge)
INFO ($)




topology-description (static gauge)
INFO ($)




state (dynamic gauge)
INFO ($)




rebalance-latency (avg | max)
INFO ($)




rebalance (rate | total)
INFO ($)




last-rebalance-time (dynamic gauge)
INFO ($)




active-task-process (ratio)

INFO ($)



standby-task-process (ratio)

INFO ($)



process-latency (avg | max)

INFODEBUG(! removed for now)

process (rate | total)

INFODEBUG ( → ) on source-nodes onlyDEBUG

punctuate-latency (avg | max)

INFODEBUG


punctuate (rate | total)

INFODEBUG


commit-latency (avg | max)

INFODEBUG


commit (rate | total)

INFODEBUG


poll-latency (avg | max)

INFO



poll (rate | total)

INFO



task-created | closed (rate | total)

INFO



enforced-processing (rate | total)


DEBUG


record-lateness (avg | max)


DEBUG


dropped-late-records (rate | total)



INFO * (window processor only)

             (! name changed)



suppression-emit (rate | total)



DEBUG * (suppress processor only)

skipped-records (rate | total)

 (! moved to lower level)INFO * ( → )

INFO * (few processors + record queue only)



suppression-buffer-size (avg | max)




DEBUG * (suppression buffer only)
suppression-buffer-count (avg | max)




DEBUG * (suppression buffer only)
expired-window-record-drop (rate | total)




DEBUG * (window store only)
put | put-if-absent .. | get-latency (avg | max)




DEBUG * (excluding suppression buffer)

                 (! name changed)


put | put-if-absent .. | get (rate)




DEBUG * (excluding suppression buffer)

                 (! name changed)


hit-ratio (avg | min | max)





DEBUG  (! name changed)


A few philosophies behind this cleanup:

  1. We will remove most of the parent sensors with `level-tag=all` except two cases.  The main idea is to let users to do rolling-ups themselves only if necessary so that we can save necessary metrics value aggregations. For those two exceptional cases, two parent-child sensor relationship is maintained because it is a bit tricky for users to do the rolling up correctly.
  2. We will keep all LEVEL-0 (instance) and LEVEL-1 (thread) sensors as INFO, and most of lower level sensors as DEBUG reporting level. They only exception is dropped-late-records and skipp-records since they indicate unexpected errors during processing and hence need to be paid attention by users.
  3. Some of the lower level metrics like "forward-rate" and "destroy-rate" are removed directly since they are overlapping with other existing metrics already.
  4. For some metrics that are only useful for a specific type of entities, like "expired-window-record-drop", we will only create the sensors lazily in order to save unnecessary costs for metrics reporters to iterate those empty sensors.


Proposed Changes

As aboveDescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

The Streams build-in metrics changes contains metrics name changes as well as tag changes (mainly because we added LEVEL-0 instance metrics in addition to the original top-level LEVEL-1 thread metrics), which will break existing users whose monitoring systems is built on the old metric / tag names.

So in order to allow users having a grace period of changing their corresponding monitoring / alerting eco-systems, I'd propose to add a config

Code Block
"built.in.metrics.version":


type: Enum 
values: {"2.2-", "2.3"}
default: "2.3"


When users override it to "2.2-" then the old metrics names / tags will still be used.


Rejected Alternatives

NoneIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.