You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Discussing

Discussion thread

JIRA Unable to render Jira issues macro, execution error.

Released: (probably 1.2)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Operators of Kafka Streams applications need awareness of records being skipped.

We currently present these metrics:

  • From the StreamThread:
    • MetricName[name=skipped-records-total, group=stream-metrics, description=The total number of skipped records, tags={client-id=...(per-thread client-id)}] (INFO level)
  • From the ProcessorNode:
    • MetricName[name=skippedDueToDeserializationError-rate, group=stream-processor-node-metrics, description=The average number of occurrence of skippedDueToDeserializationError operation per second., tags={task-id=..., processor-node-id=...}] (DEBUG level)
    • MetricName[name=skippedDueToDeserializationError-total, group=stream-processor-node-metrics, description=The total number of occurrence of skippedDueToDeserializationError operations., tags={task-id=..., processor-node-id=...}] (DEBUG level)

This arrangement is unsatisfying for two reasons:

  1. The thread-level metric actually does not capture "the total number of skipped records", but only a subset of them. Also, 'skippedDueToDeserializationError' will be incremented both for deserialization errors and for negative timestamps.  Technically speaking, these are bugs and not KIP-worthy, but I'm going to propose restructuring the metrics to make such bugs less likely in the future.
  2. There are more sources of skipped records than deserialization errors. This means we need new metrics, which is KIP-worthy.

 

A third motivation is this:

We provide the TopologyTestDriver for testing streams topologies. An application author may wish to inspect the metrics after a test, but since the driver does not construct a StreamThread, the aggregated metric is unavailable. The two ProcessorNode metrics are available. However, it's currently not possible to access the metrics from TopologyTestDriver at all.

Proposed Public Interface Change

Restructuring metrics around skipped records

I propose to remove all the existing skipped-record metrics and replace them with the following metric strategy at the ProcessorNode level:

MetricName[name=skipped-records-rate, group=stream-processor-node-metrics, description=The total number of skipped records, tags={task-id=..., processor-node-id=..., reason=...}]
MetricName[name=skipped-records-total, group=stream-processor-node-metrics, description=The average number of skipped records per second, tags={task-id=..., processor-node-id=..., reason=...}]

All of these metrics would be INFO level.

  • task-id and processor-node-id would be set as they are today
  • the new tag "reason" captures the reason the record is skipped. For example, for the skips we measure today, we would have "deserialization-error" and "negative-timestamp".

Rationale

The existing "skipped-records-total" exists for operator convenience, so that they don't have to deal with the more granular metrics unless they are debugging an issue.

However, the coarse metric aggregates across two dimensions simultaneously, summing all the skips across all ProcessorNodes in the StreamThread and also across all the reasons.

Note that to get a truly coarse skip count, over the whole application, the operator still must sum this aggregation across the threads and across the processes they are running.

Given that they have to do this sum to get a usable top-level view, I argue that we should not bother with the intermediate INFO-level aggregation, and instead expose the more granular information, which is currently hidden by default (DEBUG level).

To assist in performing this aggregation, I propose to give all the skipped-record metrics a uniform name and denote the reason in a tag. This makes it trivially easy to discover all the skipped-record metrics. Otherwise, each reason for skipping a record would have its own unique name, and operators would need to maintain a list of known skipped-record metrics. My proposal is also robust against the addition of new reasons.

 

Note that this formulation also conveniently makes all the skipped-record information visible in the TopologyTestDriver as well, since the metrics are no longer collected on StreamThread.

TopologyTestDriver#metrics()

I propose to add the following method to the TopologyTestDriver:

public Map<MetricName, ? extends Metric> metrics()

This is the same interface presented by KafkaStreams#metrics(). As TopologyTestDriver is otherwise analogous to KafkaStreams, this addition to the interface makes sense, and it would allow access to the metrics from test code.

Compatibility, Deprecation, and Migration Plan

We have a choice whether to remove or deprecate the existing metrics. I have no strong opinion, although I lean more heavily toward immediate removal.

Since there are no compiler warnings, I think it's likely that "deprecating" the existing metrics will go unnoticed, and the metrics disappearing would be just as much a surprise at the end of the deprecation period as it would be immediately.

The deprecation period would actually just create a period of ambiguity in which both old and new metrics are available, making things more confusing for operators and also increasing the changes that people would couple to the wrong (deprecated) metrics.

Test Plan

We would create a new test probing the various reasons for skipping records and verifying the metrics reflect them.

Rejected Alternatives

We could instead just make the minimal change of correcting the acknowledged bugs and adding the missing measurement points. But I think that we have an opportunity to improve the ergonomics of the metrics for operators while also addressing the current issues.

  • No labels