Current state: Approved
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, the MessagesInPerSec metrics of Kafka is a topic-level metrics. It is emitted without the partition info, see ReplicaManager. This metric provides vital information regarding topic throughput and is usually leveraged in topic quota/partition management. However, if the traffic of a topic is not balanced, the overall throughput does not provide any insight into this imbalance nature. This is particularly problematic for keyed topics as messages may be produced with hotspots in the spectrum and overload certain partitions only. In this case:
There are many attempts on this topic and some even reached the PR stage(PR#13571). But unfortunately, none of them is actually merged.
The major reason why the PRs are not merged or the KIPs are left in discussion is that the metrics may fan out too much. Consider the typical situation that currently exists in a production Kafka cluster(here we use Uber as an example) today: topics have at least 4-8 partitions and in the most extreme cases, a topic may have 256 partitions. This means the fan-out rate is at least 4-8 times and can go as high as 256x. Reporting throughput with partition info brings a higher burden to the metrics emission and collection infrastructure if we opt in blindly. To control this behavior, we would like to propose the following changes.
metrics.verbosity
MessagesInPerSec
, BytesInPerSec
, and BytesOutPerSec
metrics.verbosity
will be a new dynamic config introduced to control the verbosity(fan-out rate) of the metrics. It's a config with JSON format specifying the condition controlling fan-out of the metrics. If the value high
is set for the level
key of the configured JSON(see below for example values), high fan-out tags(e.g. partition
)will be added to metrics specified by the name
filter and will apply to all the topics that meet the conditions in the filters
section. In the low
settings, these tags will be assigned with an empty value. We elected to make it central so that this implementation can be generalized in the future either into a library, or allow other means for centralized control.
To further extend the control for future cases, the metrics name and topic can be controlled in this single configuration too(see details in the JSON Config Format section).
Type | String(regex expression) |
Default | "[]" |
Valid Values / Examples | A string representing a JSON array, details below. The above config means the |
Importance | medium |
Update Mode | Dynamic Read & Write. NOTE: To prevent unexpected behavior, dynamic configuration will not persist and the permanent value will be used once the broker is restarted. |
The JSON array mentioned above should have JSON objects configuring the corresponding metrics verbosity details at each level. Here are the valid keys in the JSON object, and their corresponding value details.
Key Name | Default Value | Valid Values | Restrictions | Example |
---|---|---|---|---|
level | low |
NOTE: We don't have any medium-level metrics as of now. We leave this option open for future extension. | JSON entries are validated with no particular order, or they are order-irrelevant. The entries can be duplicated, but are required to have no conflict between each other. If there's a conflict between values configured with different metrics level settings, the one that appears first in the sequence NOTE: It's okay to have multiple entries of | "level": "high" |
names | .* | Pattern corresponding to the names of the metric series in scope | "names": "Bytes *" | |
filters | [] (empty JSON array, applies to nothing) |
or
|
|
or
|
All regex patterns mentioned above should be java.util.regex
-compatible, see more details in Java util's regex engine.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers
metrics.verbosity=
[]
Expected behavior: verbosity level low is applied to all metric series and no high fan-out tags addedMessagesInPerSec
metrics.verbosity=
[{"level": "low", names: ".*"}]
Expected behavior: verbosity level low is applied to all metric series and no high fan-out tags addedmetrics.verbosity=[{
"level": "high",
"name": "Bytes*",
"filters" = [{topics: ["avro-topic-car", "avro-topic-bus", "avro-topic-plane"]}]
}]
Expected behavior: BytesInPerSec
, BytesOutPerSec
, BytesRejectedPerSec
will include partition-level values for the 3 listed topics.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config metrics.verbosity=[{"level": "high", names: ".*", filters=[{"topics": ["avro-*"]}]}]
Expected behavior after the change: partition-level values will be added to all the 10 metrics series for all the topics matching name regex: `avro-*
`
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --alter-config metrics.verbosity=[{"level": "low", names: ".*"}]
Empty metrics filtering logic here will ensure empty tags are not included in the final metrics sent out to the exporter. This behavior ensures the default setting incurs no additional performance cost but it could be an issue for the dynamical addition/removal of a tag. Certain metrics exporters(e.g. Prometheus) combine labels(tags) into the name. This means different label sets may map to different metrics and may fail the validation on the metrics exporters' side. To solve this issue, the filtering logic can be removed. By changing this behavior, empty tags will end up in the metrics, but that does not increase the fan-out rate and won't bring a big performance hit to the brokers under default mode.
For the purpose of this KIP, there are only 3 metrics that will have a high fan-out tag. However, if many metrics start to add high fan-out tags, switching the level could lead to a blast of metrics series and cause performance degradation. To resolve this issue, we propose to have additional control over which metrics should allow high verbosity. This will not be the concern of this KIP and shall be resolved in a follow-up effort.
Currently, to record the messages-in-rate, the API is invoked in this way:
|
After the change, it should be
|
Inside the BrokerTopicStats
, the topic-level meter will be kept the same way. In addition to that, there will be an additional map of meters for partition-level metrics.
|
The new class of BrokerTopicPartitionMetrics
will hold the following partition-level metrics. We are keeping it simple here to reduce the overhead associated with this change. Currently, only the metrics closely related to producers and consumers are listed here. We will always have the option to add more metrics, e.g. replication, if needed.
Metrics Name | Meaning |
| Messages entered the partition, per second |
| Bytes entered the partition, per second |
| Bytes retrieved from the partition, per second |
| Bytes exceeding max message size in a partition, per second |
| Produce request count for a partition, per second |
| Fetch request count for a partition, per second |
| Failed to produce request count for a partition, per second |
| Failed to fetch request count for a partition, per second |
| Broker side conversions(de-compressions) for a partition, per second |
| Broker side conversions(compressions) for a partition, per second |
If the flag is not set, its default value will be default
. This means in the event of a broker version upgrade or downgrade, the behavior will not change.
Since we do not dynamically adjust the tag list, we don't expect incompatibility with existing metrics exporters. This will also be verified in the integration tests.
brokerTopicStats
The following 3 tests will be done for common metrics collectors: JMX, Prometheus, and OpenTelemetry.
To better predict the performance penalty introduced by this KIP, we plan to run the following test cases in the following 8 setups.
Note: all setups will be tested for both code-start enabling and dynamic enabling.
The purpose of this test is to ensure this KIP won't bring performance drawbacks if the feature is not enabled.
Expected result: minimal if not no performance(CPU, memory, network, disk) hit to other broker functionality, under all 8 scenarios
The purpose of this test is to ensure when the feature is enabled for debugging purposes, it won't destabilize the broker by too much. It will also help to answer:
Search dimensions
The RecordingLevel
config also controls the verbosity of the metrics emitted by various Kafka components. However, we decided not to build on top of this config for the following reasons:
To further reduce the overall duplication and move towards a unified control. With this KIP, we propose the following change to merge/clean up the usage of RecordingLevel
:
RecordingLevel.TRACE
can be defined as {"level":"high", "name":"<trace-level-metrics>"}
metrics.verbosity
. Note: This will bring the config to the client side, it's currently a server-side-only config.To control the overall fan-out of the added tag, we can add an allowlist for the topics that have this feature enabled. We would tentatively name it to metrics.partition.level.reporting.topics
. This added configuration will contain topics that will have partition-level metrics reported.
Type | list |
Default | [] (empty list) |
Valid Values | [topic1, topic2,...] |
Importance | low |
Update Mode |
|
Reasons why this is not selected: