Discussion threadhttps://lists.apache.org/thread/832tk3zvysg45vrqrv5tgbdqx974pm3m
Vote threadhttps://lists.apache.org/thread/0xonn4y8lkj49comors0z86tpyzkvvqg
JIRA

Unable to render Jira issues macro, execution error.

Release1.16

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As a follow-up step of FLIP-168 and FLIP-224, the Flink Web UI needs to be enhanced to display the related information if the speculative execution mechanism is enabled.

Public Interfaces

The following parts of the Flink Web UI and the corresponding REST API will be  changed:

  • Currently, the detailed pages of each job vertex (namely subtasks, TaskManagers, backpressure, Metrics) assumes each subtask has only one attempt. With speculative execution enabled, the assumption will be broken, and we need to show information for each attempt distinctly for subtasks with multiple running attempts.  
  • With speculative execution enabled, if some task managers keep running slowly, they will be marked as blocked and won’t serve resource requests any more. Users might feel confused if these task managers are still counted as available in the web UI. Therefore, we need to show the number of the blocked task managers in the overview page when necessary,  and label these task managers with “Blocked” in the task manager list page.

We expect all changes to the Web UI and REST APIs to be transparent to users who don’t use speculative execution.

Web UI Changes

Subtasks

As shown in the following Figure, if speculative execution is enabled, the subtasks with multiple running attempts will be expandable. One of the fastest attempts is shown in the plain list, and others are folded. 

TaskManagers

The UI of the Taskmanagers list won’t be changed, but now it will aggregate the status of all the subtask attempts running on it, instead of only the current attempts of some subtasks.

Backpressure

Similar to the subtask lists, the subtasks with multiple running attempts will become expandable. When expanded, the non-current attempts will be shown and the subtask column will be decorated with the corresponding attempt numbers. 

FlameGraph

Currently, the FlameGraph shown here aggregates the statistics from the current attempts of all the subtasks. If speculative execution is enabled and some subtasks have multiple running attempts, we’ll change it to aggregate all these attempts instead. For the above change, both UI and REST API need not be changed.

Blocked task manager and slot count

If the number of blocked task managers and blocked free slots are not zero, the count will be displayed in the Task Managers and Slots card, named "Blocked Task Managers" and "Blocked Free Slots".

Task manager blocked state

The blocked task managers would also be labeled in the list and detail pages, like shown in the following Figures. 

REST API Changes

For the changes of the following REST APIs, to be transparent to users not using speculative execution, the response body won’t change.

Listing subtasks

If speculative execution is enabled, an other-concurrent-attempts list is added to the subtask information, elements of which contain all information of non-current attempts. 

URL: /jobs/:jobid/vertices/:vertexid

Response: 

{

   "id":"0a448493b4782967b150582570326227",

   "name":"Map",

   "parallelism":3,

   "maxParallelism":6,

   "now":1656663291812,

   "subtasks":[

       {

           "subtask":0,

           "status":"RUNNING",

           "attempt":0,

           "host":"localhost",

           "start-time":1656663195521,

           "end-time":-1,

           "duration":96291,

           "metrics":{

               …

           },

           "taskmanager-id":"3072999b-789e-40d6-bf0f-1f3c5bf9a7be",

           "start_time":1656663195521,

           "other-concurrent-attempts":[

                {

                    "status": "RUNNING",

                    "attempt": 1,

                    "host": "localhost",

                    "start-time": 1656661200454,

                    "end-time:" -1,

                    "duration": 531021,

                    "metrics":{

                        …

                    },

                    "taskmanager-id": "f8b29488-53bf-4245-8081-42969cc8e942",

                    "start_time": 1656661200454

                }

           ]

       },

       …

   ]

}

URL: /jobs/:jobid/vertices/:vertexid/subtasks/:subtaskIndex

Response: 

{

   "subtask":0,

   "status":"RUNNING",

   "attempt":0,

   "host":"localhost",

   "start-time":1656663195521,

   "end-time":-1,

   "duration":96291,

   "metrics":{

       …

   },

   "taskmanager-id":"3072999b-789e-40d6-bf0f-1f3c5bf9a7be",

   "start_time":1656663195521,

   "other-concurrent-attempts":[

        {

            "status": "RUNNING",

            "attempt": 1,

            "host": "localhost",

            "start-time": 1656661200454,

            "end-time:" -1,

            "duration": 531021,

            "metrics":{

                …

            },

            "taskmanager-id": "f8b29488-53bf-4245-8081-42969cc8e942",

            "start_time": 1656661200454

        }

   ]

}

Listing TaskManagers

As described in the above, the response JSON format of listing task managers won’t change, only now it will aggregate the status of all the subtask attempts running on it

Listing Backpressure States

If speculative execution is enabled, an other-concurrent-attempts list is added to the subtask information, elements of which contain all information of non-current attempts.

URL: /jobs/:jobid/vertices/:vertexid/backpressure

Response:

{

    "status": "ok", 

    "backpressure-level": "ok", 

    "end-timestamp": 1656923556965, 

    "subtasks": [

        {

            "subtask": 0, 

            "attempt": 1,

            "backpressure-level": "ok", 

            "ratio": 0, 

            "idleRatio": 0, 

            "busyRatio": "NaN"

            "other-concurrent-attempts":[

            {

                "attempt": 2,

                "backpressure-level": "ok", 

                "ratio": 0, 

                "idleRatio": 0, 

                "busyRatio": "NaN"

            }

        }

    ]

}

Show Blocked Task Manager Count

If the number of blocked task managers is not zero, a field named taskmanagers_blocked is added, which carries the number of the blocked task managers.

URL: /overview

Response: 

{

   "taskmanagers":4,

   "taskmanagers-blocked":1,

   "slots-total":12,

   "slots-available":2,

   "slots-blocked":2,

   "jobs-running":1,

   "jobs-finished":0,

   "jobs-cancelled":0,

   "jobs-failed":0,

   "flink-version":"1.16-SNAPSHOT",

   "flink-commit":"xxxxxxxx"

}

Show Task manager Blocked State

If speculative execution is enabled, A flag field named blocked is added, indicating if a task manager is blocked. 

URL: /taskmanagers

Response: 

{

   "taskmanagers":[

       {

           "id":"1e5a5949-6cff-4116-9186-77f1c85d2bb3",

           "path":"akka://flink/user/rpc/taskmanager_7",

           "blocked":true,

           "dataPort":61329,

           "jmxPort":-1,

           "timeSinceLastHeartbeat":1656662626801,

           "slotsNumber":3,

           "freeSlots":1,

           "totalResource":{

               …

           },

           "freeResource":{

               …

           },

           "hardware":{

               …

           },

           "memoryConfiguration":{

               …

           }

       }

   ]

}

URL: /taskmanagers/:taskManagerId

Response: 

{

   "id":"1e5a5949-6cff-4116-9186-77f1c85d2bb3",

   "path":"akka://flink/user/rpc/taskmanager_7",

   "blocked":true,

   "dataPort":61329,

   "jmxPort":-1,

   "timeSinceLastHeartbeat":1656662931281,

   "slotsNumber":3,

   "freeSlots":1,

   "totalResource":{

       …

   },

   "freeResource":{

       …

   },

   "hardware":{

       …

   },

   "memoryConfiguration":{

       …

   },

   "allocatedSlots":[

       …

   ],

   "metrics":{

       …

   }

}

Limitation and Future Work

There is also one page showing the metric of all the subtasks for each job vertex in the detail pages. Currently it shows the metrics from the current attempts of all the subtasks. Logically with the speculative execution enabled, it needs to show the metrics from all the running attempts if some subtasks have multiple running attempts. However, we noticed that for job vertices with high parallelism, returning the metrics of all the subtasks in one call as a whole would cause some performance issues, and the above proposed change would aggravate the issue.

Therefore, we tend to first keep the page as is, and set up a separate thread to optimize the current page, along with supporting subtasks with multiple attempts.


Proposed Changes

Except for the changes of the REST handlers and corresponding data structures, the following components will be changed as well.

MetricStore

To acquire the metrics for execution attempts, which will display in the subtask list, SubtaskMetricStore is introduced. It keeps the metrics of all execution attempts of a subtask. 

When a metric is added to the MetricStore, it is added in the SubtaskMetricStore as well. Only metrics of the current execution attempt will be added as the metrics of the subtask in the TaskMetricStore. 

To add and acquire the metric of a specific attempt, an `attemptNum` field is added in the TaskQueryScopeInfo and OperatorQueryScopeInfo.

Note: MetricStore is only used in the REST server. All changes here do not affect the MetricReporters.

MetricFetcher and JobDetails

In order to judge if an execution attempt is the current attempt, the MetricStore needs to be informed of the current execution attempts. The information is added in the JobDetails. The MetricFetcher will acquire the JobDetails then update the current execution attempts to the MetricStore.

ArchivedExecutionVertex

ArchivedExecutionVertex adds a current executions collection to hold the current executions from the ExecutionVertex. The collection has only one element which is the current execution attempt when the original vertex is a ExecutionVertex, while it has multiple elements when the original vertex is a SpeculativeExecutionVertex. Either way the collection must contain the current execution attempt. As the ArchivedExecutionVertex is enhanced, the ArchivedSpeculativeExecutionVertex can be dropped.


Compatibility, Deprecation, and Migration Plan

All proposed changes should take effect only when speculative execution is enabled, which is a new feature expected to be released along with this FLIP. This entails that Flink's default behavior won't change. 


Test Plan

We need to update existing and add UI tests to make sure the new REST API works as expected.