Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the MessagesInPerSec metrics of Kafka is a topic-level metrics. It is emitted without the partition info, see ReplicaManager. This metric provides vital information regarding topic throughput and is usually leveraged in topic quota/partition management. However, if the traffic of a topic is not balanced, the overall throughput does not provide any insight into this imbalance nature. This is particularly problematic for keyed topics as messages may be produced with hotspots in the spectrum and overload certain partitions only. In this case:

...

Part 1: Verbosity Level Control

metrics.verbosity will will be a new dynamic config introduced to control the verbosity(fan-out rate) of the metrics. It's a config with JSON format specifying the condition controlling fan-out of the metrics. If the value high is selected set for the level part key of the configured JSON(see below for example values), high fan-out tags(e.g. partition)will be added to metrics specified by the name filter and will apply to all the topics that meet the conditions in the filters section. In the low settings, these tags will be assigned with an empty value. We elected to make it central so that this implementation can be generalized in the future either into a library, or allow other means for centralized control.

...

Type

String(regex expression)

Default

"[]"

Valid Values / Examples


A string representing a JSON array, details below.
Example(actual string will be skipped):
[{
   
"level": "high",
    "name": "Bytes*",
    "topicsfilters" = [{topics: ["avro-topic-car", "avro-topic-bus", "avro-topic-plane"]}]
}]

The above config means the BytesInPerSec and BytesOutPerSec will be having partition-level metrics for the 3 topics of avro-topic-car, avro-topic-bus, and avro-topic-plane

Importance

medium

Update Mode

Dynamic Read & Write

...

Key NameDefault ValueValid ValuesRestrictionsExample
levellowlow, medium, high

JSON entries are validated with no particular order, or they are order-irrelevant. The entries can be duplicated, but are required to have no conflict between each other.

If there's a conflict between values configured with different metrics level settings, the one that appears first in the sequence [high, medium, low] will take precedence. For example, a topic configured in both low and high will be treated with the configuration values in high.

NOTE: It's okay to have multiple entries of level: high, these entries may well apply to different metric names or dimension filters.

"level": "high"
names.*

Pattern corresponding to the names of the metric series in scope


"names": "Bytes*"
filters{} [] (empty JSON objectarray, applies to nothing)

[{"topicPattern": "<topic pattern>"}]

or

[{"topics": [<a list of topics>](string array)}]

  • The filters in the array are of AND relationship. For example, if there are two filters A=Topic:a,b and B=Topic:a,c,d then the rule will be applied to a only.
  • topicPattern and topics should not be used at the same time
  • For now, we only allow the filter to take topics, but it can be easily extended.

[{"topicPattern": "avro-topics-*"}]

or

[{"topics" = : ["avro-topic-car", "avro-topic-bus", "avro-topic-plane"]}]

All regex patterns mentioned above should be java.util.regex-compatible, see more details in Java util's regex engine

Working with the Configuration via CLI

  • Querying existing config
    • Command: bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers

    • Example Result 1(default, if not set previously): metrics.verbosity=[]
      Expected behavior: verbosity level low is applied to all metric series and no high fan-out tags addedMessagesInPerSec
    • Example Result 2: metrics.verbosity=[{"level": "low", names: ".*"}]
      Expected behavior: verbosity level low is applied to all metric series and no high fan-out tags added
    • Example Result 3:
      metrics.verbosity=[{
          "level": "high",
          "name": "Bytes*",
          "filters" = [{topics: ["avro-topic-car", "avro-topic-bus", "avro-topic-plane"]}]
      }]
      Expected behavior: BytesInPerSec, BytesOutPerSec, BytesRejectedPerSec will include partition-level values for the 3 listed topics.

  • Setting new config
    • Example Command 1(add config): bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config metrics.verbosity=[{"level": "high", names: ".*", filters=[{"topics": ["avro-*"]}]}]
      Expected behavior after the change: partition-level values will be added to all the 10 metrics series for all the topics matching name regex: `avro-*`

    • Example Command 1(alter config): bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --alter-config metrics.verbosity=[{"level": "low", names: ".*"}]
      Expected behavior after the change: partition-level values will be removed for all the metrics for all the topics.

Performance Consideration

...

Metrics Name

Meaning

MessagesInPerSec

Messages entered the partition, per second

BytesInPerSec

Bytes entered the partition, per second

BytesOutPerSec

Bytes retrieved from the partition, per second

BytesRejectedPerSec

Bytes exceeding max message size in a partition, per second

TotalProduceRequestsPerSec

Produce request count for a partition, per second

TotalFetchRequestsPerSec

Fetch request count for a partition, per second

FailedProduceRequestsPerSec

Failed to produce request count for a partition, per second

FailedFetchRequestsPerSec

Failed to fetch request count for a partition, per second

FetchMessageConversionsPerSec

Broker side conversions(de-compressions) for a partition, per second

ProduceMessageConversionsPerSec

Broker side conversions(compressions) for a partition, per second


Compatibility, Deprecation, and Migration Plan

...

  • How many topic partitions, if having this feature enabled, would not overload the broker by too much?
  • What will the worst performance result if this feature is enabled under the most severe case?
    • Most severe casecases: high traffic broker, high partition count, low metrics emission interval, high percentage of topic-partitions with this feature enabled

...

  • Topic-partitions: 10% - 100% of topic-partitions of the broker, 10% step
  • Metrics: 1-3 of the names listed in the Metrics Names

Rejected Alternatives

Using the Existing RecordingLevel Config

The RecordingLevel config also controls the verbosity of the metrics emitted by various Kafka components. However, we decided not to build on top of this config for the following reasons:

  1. This config currently has only one string value and can only control metrics emission at the Sensor level. This does not meet our need for detailed fan-out rate control.
  2. This config has to be applied to all new configs if we want to use it to control the recording level. This means we first need to create a separate sensor, which introduces unnecessary duplications.
  3. This config lacks the dynamic adjustment capability demanded by this KIP.

To further reduce the overall duplication and move towards a unified control. With this KIP, we propose the following change to merge/clean up the usage of RecordingLevel:

  1. The recording levels can be mapped to the config values defined above. For example, RecordingLevel.TRACE can be defined as {"level":"high", "name":"<trace-level-metrics>"}
  2. The existing metric sensors can be updated to remove the RecordingLevel and accept the control of metrics.verbosity. Note: This will bring the config to the client side, it's currently a server-side-only config.

Topic Allow Listing for Partition-Level Reporting

...