...
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Motivation
Currently users have to click on every operator and check how much data each sub-task is processing to see if there is data skew. This is particularly cumbersome and error-prone for jobs with big job graphs. Data skew is an important metric that should be more visible.
Public Interfaces
Exposed monitoring information. This is not expected to be a change in the programming interface, but rather a change in the Flink Dashboard UI and an addition to the list of available metrics.
...
Code Block |
---|
min(average_absolute_deviation(list_of_number_of_records_received_by_each_subtask) / mean(list_of_number_of_records_received_by_each_subtask) * 100, 100) |
Instead of using standard deviation, I choose to use "average absolute deviation" which avoids the multiplication and the square-root operations and simplifies the metric calculation while serving the same purpose and yielding similar results.
...
Number of records received by sub-tasks within an operator | SkewnessData Skew Score |
---|---|
1 1 1 5 10 (i.e. five subtasks, first three each receives 1 record and last two gets 5 and 10 records each) | 88% |
5 5 5 5 5 | 0% |
1 1 5 5 5 | 54% |
4 5 5 5 5 | 7% |
0 0 0 0 0 | 0 (idle operator) |
The accumulation of "received number of records" over a long period of time can hide a recent data skew event. The same can also hide a recent fix to an existing data skew problem. Therefore the proposed metric will need to look at the change in the received number of records within a period, similar to the existing "Backpressure" or "Busy" metrics on the Flink Job Graph.
Proposed metrics at Operator level:
- dataSkewPercentage: This will be used to show an overall or historical data skew score under the proposed Data Skew tab (see the UI Changes section).
- The existing numRecordsIn metric can be used to build this new metric
- The existing numRecordsIn metric can be used to build this new metric
- dataSkewPercentagePerSecond: This will be used to show a "live" score on the Job Graph (see the UI Changes section).
- The existing numRecordsInPerSecond metric can be used to build this new metric
See the "rejected alternatives" section for other metrics that were considered.
Note that this FLIP is designed to address an immediate gap in the monitoring of Flink jobs in the Flink UI. If the proposed data skew score is found to be not sufficient or if the users prefer a different metric, it can be improved in future FLIPs, and indeed can be made configurable (e.g. by using the Strategy Pattern).
UI Changes
Additional "
...
Data Skew"
...
Metric on the Flink
...
Job Graph
As shown in below screenshot, each operator on the Flink job graph UI would show an additional Data skew score.
...
The accumulation of "received number of records" over a long period of time can hide a recent data skew event. The same can also hide a recent fix to an existing data skew problem. Therefore the proposed metric will need to look at the change in the received number of records within a recent period, similar to the existing "Backpressure" or "Busy" metrics on the Flink Job Graph, and show a "live" data skew score.
Additional Tab to List All Operators and Their Data Skew Score in Descending Order of Their Data Skew Score
The proposed tab would sit next to the Exceptions tab as its purpose seems to me to be more similar to the Exceptions tab than other tabs. Highlighted in red blue in below screenshot.
Note that below screenshot/mock-up does not show the Data Skew tab next to the Exceptions tab, but the actual implementation will put it next to the Exceptions tab.
The look and feel of the proposed Data Skew tab will This FLIP does not talk in detail about how the UI of this new Data Skew tab should look. The look should be compatible with the rest of the UI. The list/table view of checkpoints under the Checkpoints tab could be used for inspiration. The content of the proposed tab will roughly look as follows:
It will show the overall accumulated data skew score of the operators as opposed to current/live view proposed under the Additional "Data Skew" Metric on the Flink Job Graph section. This tab will also contain a definition of what data skew is and the metric being used to calculate it before the table/list of operators (this is not shown in the screenshot and is left as implementation detail). It will have a Refresh button as shown in the screenshot similar to the Checkpoints tab.
A Note on Using Feature Flag / Config for Enabling the Proposed Changes
The proposed changes should only be improving the user experience without any unwanted side-effect. Therefore this proposal does not suggest using a feature flag or config to enable this new feature. However, I don't consider this a major contentious point for the FLIP and am happy to revisit this before/during the implementation phase (e.g. after testing impact on UI).
Compatibility, Deprecation, and Migration Plan
...
The following end-to-end test scenarios will be carried out for the UI:
- Data skew score is accurately presented on the Job Graph vertices and is refreshed, similar to the existing Backpressure and Busy metrics
- The new Data Skew tab lists the operator in descending order of their data skew score
- This tab shows the data skew score for operators from the beginning of the job. That is, it does not show the live/current data skew score.
- Data skew score is refreshed every time the tab is loaded
- A refresh button similar to that on the Checkpoint page can be used to refresh the data on screen
- Given a job with no or close to 0 data skew, all operators show a data skew score of 0% or a figure close to 0%
- Given a job with an operator that is suffering from data skew of about 50%, the figure is accurately reflect on the operator on the Flink job graph
- Above scenarios are tested under the new Data skew tab
- Operators are sorted according to their data skew score
- Impact on UI
- Page load speed comparison (there should not be extra delay to the job graph page load time). If delay turns out to be unavoidable, this should be documented as a side-effect
- Monitor page memory usage and ensure there is no memory leak or memory usage is not noticeably higher than before. If noticeable increase in memory is unavoidable, document this as a side-effect
Data Skew Score Tests
The following behaviour will be unit and end-to-end tested:
- Test case for the metric where the result is undefined i.e. idle operator. We will show 0 as the data skew score on the UI
- Boundary tests: Ensure result can't go beyond 100% or below 0%
- Define test scenarios for 5%, 50%, 99% data skew score and automate the tests
...
Rejected data skew score metrics
Metric 1:
Code Block |
---|
(max(list_of_number_of_records_received_by_each_subtaskssubtask) - min(list_of_number_of_records_received_by_each_subtaskssubtask)) / sum(list_of_number_of_records_received_by_each_subtaskssubtask) |
Example scenarios:
- Given 10 sub-tasks, 1 of which has received 1 record only. The other 9 sub-tasks each has received 10 records.
Data skew score: (9 - 1) / (9*10 + 1*1) = 10% - Given 10 sub-tasks. 5 sub-tasks have each received 1 record only. Each of the other 5 sub-tasks has received 10 records.
Data skew score: (10 - 5) / (5*1 + 5*10) = 9% - Given 10 sub-tasks. 5 sub-tasks have each received 1 record only. Each of the other 5 sub-tasks has received 50 records.
Data skew score: (50 - 5) / (5*1 + 5*50) = 18%
This metric gives us somewhat low scores for what I'd consider significant skew. For instance in the last example, half of the sub-tasks received a x50 fewer records than the other half and the score is only 18%.
Metric 2:
Code Block |
---|
min(list_of_number_of_records_received_by_subtaskseach_subtask)/max(list_of_number_of_records_received_by_subtaskseach_subtask) |
- Given 10 sub-tasks, 1 of which has received 1 record only. The other 9 sub-tasks each has received 10 records.
Data skew score: 10% - Given 10 sub-tasks. 5 sub-tasks have each received 1 record only. Each of the other 5 sub-tasks has received 10 records.
Data skew score: 10% - Given 10 sub-tasks. 5 sub-tasks have each received 1 record only. Each of the other 5 sub-tasks has received 50 records.
Data skew score: 2%
...