You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current stateUnder Discussion

Discussion thread: here 

JIRA: KAFKA-9983

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently the actual end-to-end latency of a record flowing through Streams is difficult to gauge at best. It's also difficult to build real-time applications without some sense of the latency you can expect between the time that an event occurs and when this event is processed and reflected in the output results. Being able to bound this latency is an important requisite for many apps, and exposing this through metrics should go a long way towards enabling users to make the right design choices.

Public Interfaces

The following metrics would be added:

  • end-to-end-latency-max [ms]
  • end-to-end-latency-75th [ms] (75th percentile)
  • end-to-end-latency-99th [ms] (99th percentile)

These will be exposed on the task-level at the recording level INFO with the following tags

  • type = stream-task-metrics
  • thread-id=[threadId]
  • task-id=[taskId]

We will also expose these metrics on the operator-level at the recording level DEBUG with the following tags:

  • type = stream-processor-node-metrics
  • thread-id=[threadId]
  • task-id=[taskId]
  • processor-node-id=[processorNodeId]

Proposed Changes

Imagine a simple 3-node subtopology with source node S, filter node F, and sink node I. For any record flowing through this with record timestamp t, let tS be the system (wallclock) time when it is read from the source topic, tF be the time when it is processed by the filter, and tI be the time when it reaches the sink node. The end-to-end latency at operator S for a given record is defined as 

LS (t) = tS  - t

and likewise for the other operator-level end-to-end latencies. The task-level processing latency L will be computed based on the source node, ie = LS. The source nodes reading from the user input topics therefore represent the consumption latency, ie the time it took for a newly-created event to be read by Streams. This can be especially interesting in cases where some records may be severely delayed: for example by a IoT device with unstable network connections, or when a user's smartphone reconnects to the internet after a flight and pushes all the latest updates.

Note that for a given record,  LS <= LF <= LI . This holds true within and across subtopologies. A downstream subtopology will always have a task-level end-to-end latency greater than or equal to that of an upstream subtopology for a single task, which in turn implies the same holds true for the statistical measures exposed via the new metrics. Comparing the processing latency across tasks (or across operators) will also be of interest as this represents the processing delay: the amount of time it took for Streams to actually process the record from point A to point B within the topology. Seeing a large processing delay indicates a possible bottleneck in the topology and may help users debug their application performance.

Late arriving records will be included in this metric, even if they are otherwise dropped due to the grace period having passed. Although we already expose a metric for the number of late dropped records, there is no way for a user to find out how late the record was. Including them in the end-to-end latency metrics may for one thing help users to set a reasonable grace period if they see that a large number of records are being dropped. Another slight difference with the concept of late records is that they are dropped based on stream time, whereas this metric is always reported with respect to the current time. The stream-time may lag the system time in low traffic, for example, or when all records are considerably delayed. This might mean the user sees no dropped records even though the end-to-end latency is large. 

Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

Using stream time

We define the processing latency in terms of individual record timestamps, but we could have instead it as the difference between the system time and the stream time, ie LS  = tS  - s where st is the stream-time. This approach has some drawbacks; first and foremost, we are losing information in the case of out-of-order data, and may not notice records with extreme delays. In the example above, an IoT device that regularly disconnects for long periods of time would push a lot of out-of-order data when it reconnects. Since these records would not advance or effect the stream-time, this delay would not be reflected as an increase in the processing latency metric. But the end-to-end latency of these records is of course quite high



  • No labels