Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/m5ockoork0h2zr78h77dcrn71rbt35ql
Vote threadhttps://lists.apache.org/thread/8b38w9xp1ns9lr3mnxb15d26jzxp6sz4
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-34025

Release1.20


Motivation

Currently users have to click on every operator and check how much data each sub-task is processing to see if there is data skew. This is particularly cumbersome and error-prone for jobs with big job graphs. Data skew is an important metric that should be more visible.

Public Interfaces

This is not expected to be a change in the programming interface, but rather a change in the Flink Dashboard UI and an addition to the list of available metrics. 

Proposed Changes

Data Skew Score

Using a variant of the metric definition in https://en.wikipedia.org/wiki/Coefficient_of_variation

Code Block
min(average_absolute_deviation(list_of_number_of_records_received_by_each_subtask) / mean(list_of_number_of_records_received_by_each_subtask) * 100, 100)

Instead of using standard deviation, I choose to use average absolute deviation which avoids the multiplication and the square-root operations and simplifies the metric calculation while serving the same purpose and yielding similar results.

We need to cap the metric at 100% because the Coefficient of Variation calculation can result in values above 100 when there is large variance in the dataset, hence the min operation.

The above metric seems to yield reasonable results. Examples:

Number of records received by sub-tasks within an operatorData Skew Score

1 1 1 5 10 (i.e. five subtasks, first three each receives 1 record and last two gets 5 and 10 records each)

88%

5 5 5 5 5

0%

1 1 5 5 5

54%

4 5 5 5 5

7%

0 0 0 0 0

0 (idle operator)

Proposed metrics at Operator level:

  • dataSkewPercentage: This will be used to show an overall or historical data skew score under the proposed Data Skew tab (see the UI Changes section).
    • The existing numRecordsIn metric can be used to build this new metric
  • dataSkewPercentagePerSecond: This will be used to show a "live" score on the Job Graph (see the UI Changes section).
    • The existing numRecordsInPerSecond metric can be used to build this new metric

See the "rejected alternatives" section for other metrics that were considered.

Note that this FLIP is designed to address an immediate gap in the monitoring of Flink jobs in the Flink UI. If the proposed data skew score is found to be not sufficient or if the users prefer a different metric, it can be improved in future FLIPs, and indeed can be made configurable (e.g. by using the Strategy Pattern).

UI Changes

Additional "Data Skew" Metric on the Flink Job Graph

As shown in below screenshot, each operator on the Flink job graph UI would show an additional Data skew score.

The accumulation of "received number of records" over a long period of time can hide a recent data skew event. The same can also hide a recent fix to an existing data skew problem. Therefore the proposed metric will need to look at the change in the received number of records within a recent period, similar to the existing "Backpressure" or "Busy" metrics on the Flink Job Graph, and show a "live" data skew score.

Additional Tab to List All Operators and Their Data Skew Score in Descending Order of Their Data Skew Score

The proposed tab would sit next to the Exceptions tab as its purpose seems to me to be more similar to the Exceptions tab than other tabs. Highlighted in blue in below screenshot. Note that below screenshot/mock-up does not show the Data Skew tab next to the Exceptions tab, but the actual implementation will put it next to the Exceptions tab.

The look and feel of the proposed Data Skew tab will be compatible with the rest of the UI. The list/table view of checkpoints under the Checkpoints tab could be used for inspiration. The content of the proposed tab will roughly look as follows:

It will show the overall accumulated data skew score of the operators as opposed to current/live view proposed under the Additional "Data Skew" Metric on the Flink Job Graph section. This tab will also contain a definition of what data skew is and the metric being used to calculate it before the table/list of operators (this is not shown in the screenshot and is left as implementation detail). It will have a Refresh button as shown in the screenshot similar to the Checkpoints tab.

A Note on Using Feature Flag / Config for Enabling the Proposed Changes

The proposed changes should only be improving the user experience without any unwanted side-effect. Therefore this proposal does not suggest using a feature flag or config to enable this new feature. However, I don't consider this a major contentious point for the FLIP and am happy to revisit this before/during the implementation phase (e.g. after testing impact on UI).

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

No negative impact. Existing users will enjoy a better overview of their data skew state. They will be able to see if their job is suffering from data skew at one glance.

  • If we are changing behavior how will we phase out the older behavior?

N/A

  • If we need special migration tools, describe them here.

N/A

  • When will we remove the existing behavior?

N/A

Test Plan

UI Tests

The following end-to-end test scenarios will be carried out for the UI:

  • Data skew score is accurately presented on the Job Graph vertices and is refreshed, similar to the existing Backpressure and Busy metrics
  • The new Data Skew tab lists the operator in descending order of their data skew score
    • This tab shows the data skew score for operators from the beginning of the job. That is, it does not show the live/current data skew score.
    • Data skew score is refreshed every time the tab is loaded
    • A refresh button similar to that on the Checkpoint page can be used to refresh the data on screen
  • Impact on UI
    • Page load speed comparison (there should not be extra delay to the job graph page load time). If delay turns out to be unavoidable, this should be documented as a side-effect
    • Monitor page memory usage and ensure there is no memory leak or memory usage is not noticeably higher than before. If noticeable increase in memory is unavoidable, document this as a side-effect

Data Skew Score Tests

The following behaviour will be unit and end-to-end tested:

  • Test case for the metric where the result is undefined i.e. idle operator. We will show 0 as the data skew score on the UI
  • Boundary tests: Ensure result can't go beyond 100% or below 0%
  • Define test scenarios for 5%, 50%, 99% data skew score and automate the tests

Rejected Alternatives

Rejected data skew score metrics

Metric 1:

Code Block
(max(list_of_number_of_records_received_by_each_subtask) - min(list_of_number_of_records_received_by_each_subtask)) / sum(list_of_number_of_records_received_by_each_subtask)

Example scenarios:

  • Given 10 sub-tasks, 1 of which has received 1 record only. The other 9 sub-tasks each has received 10 records.
    Data skew score: (9 - 1) / (9*10 + 1*1) = 10%
  • Given 10 sub-tasks. 5 sub-tasks have each received 1 record only. Each of the other 5 sub-tasks has received 10 records.
    Data skew score: (10 - 5) / (5*1 + 5*10) = 9%
  • Given 10 sub-tasks. 5 sub-tasks have each received 1 record only. Each of the other 5 sub-tasks has received 50 records.
    Data skew score: (50 - 5) / (5*1 + 5*50) = 18%

This metric gives us somewhat low scores for what I'd consider significant skew. For instance in the last example, half of the sub-tasks received a x50 fewer records than the other half and the score is only 18%.

Metric 2:

Code Block
min(list_of_number_of_records_received_by_each_subtask)/max(list_of_number_of_records_received_by_each_subtask)
  • Given 10 sub-tasks, 1 of which has received 1 record only. The other 9 sub-tasks each has received 10 records.
    Data skew score: 10%
  • Given 10 sub-tasks. 5 sub-tasks have each received 1 record only. Each of the other 5 sub-tasks has received 10 records.
    Data skew score: 10%
  • Given 10 sub-tasks. 5 sub-tasks have each received 1 record only. Each of the other 5 sub-tasks has received 50 records.
    Data skew score: 2%

As can be seen above, this does not take into account the number of sub-tasks and therefore yields misleading results.