Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Discussing Adopted

Discussion threadhttp://mail-archives.apache.org/mod_mbox/kafka-dev/201803.mbox/browser

JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6376

ReleasedPR: https://github.com/apache/kafka/pull/4812

Released: 2.0(probably 1.2)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Restructuring metrics around skipped records

I propose to remove all the existing other skipped-record metrics and replace them with only the following metric strategy metrics at the ProcessorNode StreamThread level:

MetricName[name=skipped-records-rate, group=stream-processor-node-metrics, description=The total number of skipped records, tags={taskclient-id=..., processor-node-id=..., reason=...(per-thread client-id)}]
MetricName[name=skipped-records-total, group=stream-processor-node-metrics, description=The average number of skipped records per second, tags={taskclient-id=..., processor-node-id=..., reason=...(per-thread client-id)}]

All of these metrics would be INFO level.

  • task-id and processor-node-id would be set as they are today
  • the new tag "reason" captures the reason the record is skipped. For example, for the skips we measure today, we would have "deserialization-error" and "negative-timestamp".

Rationale

The existing "skipped-records-total" exists for operator convenience, so that they don't have to deal with the more granular metrics unless they are debugging an issue.

However, the coarse metric aggregates across two dimensions simultaneously, summing all the skips across all ProcessorNodes in the StreamThread and also across all the reasons.

Note that to get a truly coarse skip count, over the whole application, the operator still must sum this aggregation across the threads and across the processes they are running.

Given that they have to do this sum to get a usable top-level view, I argue that we should not bother with the intermediate INFO-level aggregation, and instead expose the more granular information, which is currently hidden by default (DEBUG level).

To assist in performing this aggregation, I propose to give all the skipped-record metrics a uniform name and denote the reason in a tag. This makes it trivially easy to discover all the skipped-record metrics. Otherwise, each reason for skipping a record would have its own unique name, and operators would need to maintain a list of known skipped-record metrics. My proposal is also robust against the addition of new reasons.

 

 

Instead of also maintaining more DEBUG-level granular skipped metrics (such as "skippedDuetoDeserializationError"), we will capture useful details about the record that got skipped (topic, partition, offset) as well as the reason for the skip ("deserialization error", "negative timestamp", etc.) with a WARN level log. 

 

This provides a slim an clear interface (only one metric to monitor), while still exposing the granular information for those who need to track down the reason for the skips. In fact, having log messages for debugging is better for this case, since we can include contextual information about the record that got skipped. And this is also a common division of responsibility between the two information sources: metrics are for monitoring, logs are for debuggingNote that this formulation also conveniently makes all the skipped-record information visible in the TopologyTestDriver as well, since the metrics are no longer collected on StreamThread.

TopologyTestDriver#metrics()

I propose to add the following method to the TopologyTestDriver:

...

We could instead just make the minimal change of correcting the acknowledged bugs and adding the missing measurement points. But I think that we have an opportunity to improve the ergonomics of the metrics for operators while also addressing the current issues.

We also discussed keeping one metric per skip reason, but that felt too granular. Plus, having the metrics set to DEBUG creates a visibility problem: A) People can't discover the metric just by looking through the available metrics at run time, B) Assuming there's an unexpected skip reported, operators may turn on DEBUG, but can't be guaranteed to see another skip. Logging the granular information is the alternative we settled on.

We discussed including some built-in rollup mechanism to produce aggregated metrics at the top level. This is difficult for us to do well from within the Streams library, since it would involve each instance observing the metrics of all the other instances. We could create some kind of tool to do metric aggregation, but it just seems out of scope for this KIP. If metric aggregation is a general problem the community surfaces, we'll tackle it with a separate KIP. This decision allows us to focus in the library about surfacing the right information at the right level of granularity.

We discussed moving the skipped-records metric to the StreamTask. This has the advantage that they will automatically be included in the TopologyTestDriver, but the disadvantage that they increase the total number of metrics to pay attention to. The StreamThread is the highest level of summary that we can provide without introducing lock contention while updating the metrics.