Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The following metrics would be added:

  • record-stalenesse2e-latency-max [ms]
  • record-stalenesse2e-latency-p99 [ms] (99th percentile)
  • record-e2e-stalenesslatency-p90 [ms] (90th percentile)
  • record-e2e-latency-staleness-min [ms]

These will be exposed on the task-level at the recording level INFO with the following tags

...

  • type = stream-processor-node-metrics
  • thread-id=[threadId]
  • task-id=[taskId]
  • processor-node-id=[processorNodeId]

In all cases the metrics will be computed at the end of the operation or subtopology. In the case of task-level metrics for example, this means the metric reflects the end-to-end-latency at the time it leaves the sink node.

Proposed Changes

Imagine a simple 3-node subtopology with source node O, filter node F, aggregation A, and suppression P. For sink node I. For any record flowing through this with record timestamp t, let tO be the system (wallclock) time when it is read sent from the source topic, tA be the time when it is finished being processed by the aggregator node, and and tPI be the time when it reaches leaves the suppression node. The staleness at sink node for the output or repartition topic. The end-to-end latency at operator for a given record is defined as 

SLO (t) = tO  - t

and likewise for the other operator-level end-to-end latencies. This represents the age of the record at the time is was received processed by operator O. The task-level staleness Send-to-end (e2e) latency L will be computed based on the source sink node, ie = SOLI. The source nodes node e2e latency reading from the user input topics therefore represent the consumption latency, the time it took for a newly-created event to be read by Streams. This can be especially interesting in cases where some records may be severely delayed: for example by a IoT device with unstable network connections, or when a user's smartphone reconnects to the internet after a flight and pushes all the latest updates. On the other side, the sink node e2e latency – which is also the task-level e2e latency, reveals how long it takes for the record to be fully processed through that subtopology. If the task is the final one in the full topology, this is the full end-to-end latency of a record through Streams.

Note that for a given record,  SLO <= SLA <= S L. This holds true within and across subtopologies. A downstream subtopology will always have a task-level end-to-end latency staleness greater greater than or equal to that of an upstream subtopology for a single task , (which in turn implies the same holds true for the statistical measures exposed via the new metrics). Comparing the staleness e2e latency across tasks (or across operators) will also be of interest as this represents the processing delay: the amount of time it took for Streams to actually process the record from point A to point B within the topology. Note that this does not represent the latency of processing, since the system time is only updated at the source node. Instead it represents how long the record may have sat in a cache or suppression buffer before flowing further downstream

Late arriving records will be included in this metric, even if they are otherwise dropped due to the grace period having passed. This metric is related but ultimately orthogonal to the concept of late-ness. One difference for example is that they are dropped based on stream time, whereas this metric is always reported with respect to the current time. The stream-time may lag the system time in low traffic, for example, or when all records are considerably delayed. This might mean the user sees no dropped records even though the staleness is large. 

...

We define the staleness in terms of individual record timestamps, but we could have instead it as the difference between the system time and the stream time, ie S = tO  - s where st is the stream-time. This approach has some drawbacks; first and foremost, we are losing information in the case of out-of-order data, and may not notice records with extreme delays. In the example above, an IoT device that regularly disconnects for long periods of time would push a lot of out-of-order data when it reconnects. Since these records would not advance or effect the stream-time, this delay would not be reflected as an increase in the processing latency metric. But the end-to-end latency of these records is of course quite high

Computing the metric at record intake time

This idea was originally discussed but ultimately put to rest as it does address the specific goal set out in this KIP, to report the time for an event to be reflected in the output. This alternative metric, which we call "staleness", has some use as a gauge of the record time when received by an operator, which may have implications for its processing for some operators. However this issue is orthogonal and thus rejected in favor of measuring at the record output.

Reporting mean or median (p50)

Rejected because:

  1. Averages do not seem to convey any particularly useful information
  2. We couldn't agree on which one to use
  3. https://youtu.be/lJ8ydIuPFeU?t=786