Status

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/qycqmxbh37b5qzs72y110rp8457kkxkb
Vote thread

https://lists.apache.org/thread/xg6mtqmjbr2dloms8hh4yh30mrlkoboj

JIRA

Unable to render Jira issues macro, execution error.

Release1.16

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Streaming and Batch users have different interests in probing a job. While streaming users mainly care about the instant status of a running job (tps, delay, backpressure, etc.), batch users care more about the overall job status during the entire execution (queueing / execution time, total data amount, etc.).

As Flink grows into a unified streaming & batch processor and is adopted by more and more batch users, the experiences in inspecting completed jobs has become more important than ever.

We compared Flink with other popular batch processors, and spotted several potential improvements. Most of these changes involves WebUI & REST API changes, which should be discussed and voted on as FLIPs. However, creating separated FLIPs for each of the improvement might be overkill, because changes needed by each improvement are quite small. Thus, we include all these potential improvements in this one FLIP.

Public Interfaces

This FLIP contains WebUI and REST API changes. See Proposed Changes for details.

Proposed Changes

We propose the following improvements.

  • Metrics

    • Add time breakdown metrics

    • Support metric aggregation

  • Add environmental information

  • Support viewing logs in history server

Some of the changes applies to both jobmanager and history server, while others only affect history server. We'll explain in detail in the following sections.

In addition to the proposed improvements, we discuss how to identify a JobManager / TaskManager in the context of history server in the last subsection.

Metrics

We propose to add the following metrics, to help users understand where the time is spent.

  • Duration that a task stays in each status

    • Status included are: INITIALIZING, CREATED, SCHEDULED, RUNNING, DEPLOYING

    • Status excluded (duration is less interested) are: FINISHED, CANCELING, CANCELED, RECONCILING

  • Accumulated time that a running task is busy / idle / back-pressured

Moreover, we propose to support aggregating task metrics.

  • Supported scopes: JobVertex, TaskManager

  • Supported aggregations: min, max, avg, mid, p25, p75

This change applies to both jobmanager and history server.

WebUI Preview

Split the per-subtask metrics and aggregated metrics into tabs. Add a table of aggregated metrics including the following content:

  • aggregations: min, max, avg, sum, median, p25, p75, p95

  • metrics

    • status durations: CREATED, SCHEDULED, INITIALIZING, DEPLOYING, RUNNING

    • read/write records

    • read/write bytes

    • accumulated backpressured/idle/busy time


Extend the original subtasks table by adding extra columns:

  • "Accumulated Time"

  • "Status Durations" 


Under the TaskManagers tab, each taskManager row supports an action of viewing the "Aggregated Metrics". The modal's content table is identical to that under the subtask tab.

REST API Changes

Accumulated busy / idle / back-pressured time

We propose to introduce three new fields - "accumulated-idle-time", "accumulated-busy-time", and "accumulated-backpressured-time" - to IOMetricsInfo which represent the accumulated busy / idle / back-pressured time of a running task respectively.

The newly introduced fields will be added to the response of the following REST APIs:

For example, the metrics field in the response of /jobs/:jobid will be changed to:

"metrics": {
    "read-bytes": 0,
    "read-bytes-complete": true,
    "write-bytes": 655360,
    "write-bytes-complete": true,
    "read-records": 0,
    "read-records-complete": true,
    "write-records": 85087,
    "write-records-complete": true,
    "accumulated-backpressured-time": 2340,
    "accumulated-idle-time": 120,
    "accumulated-busy-time": 100.0
}

Status duration of tasks

We propose to introduce a new field "status-duration" which represents the duration that a task stays in each status to the following REST APIs:

An example of the "status-duration":

"status-duration": {
    "CREATED": 12,
    "SCHEDULED": 340,
    "INITIALIZING": 1203,
    "DEPLOYING": 3210,
    "RUNNING": 304930
}

Aggregating task metrics

We propose to introduce the aggregating task metrics with key "aggregated" to the following REST APIs:

As shown below, the "aggregated" field will contain the aforementioned aggregations of tasks' status duration and its accumulated busy / idle / back-pressured time:

"aggregated": {
    "metrics": {
        "read-bytes": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        },
        "write-bytes": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        },
        "read-records": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        },
        "write-records": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        },
        "accumulated-backpressured-time": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        },
        "accumulated-idle-time": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        },
        "accumulated-busy-time": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        }
    },
    "status-duration": {
        "CREATED": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        },
        "SCHEDULED": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        },
        "INITIALIZING": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        },
        "DEPLOYING": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        },
        "RUNNING": {
            "min": 0,
            "max": 0,
            "avg": 0,
            "sum": 0,
            "median": 0,
            "p25": 0,
            "p75": 0,
            "p95": 0
        }
    }
}

Environmental Information

In addition to the flink configurations, we propose to also display the environment variables and JVM arguments and include them in the job archive. These currently can be found in logs, which is not always available after the job finishes (due to log rotation, or from history server).

This change applies to both jobmanager and history server.

WebUI Preview

Changes to history server:

  • Add a "Cluster Configuration" tab, and the tables are scrollable for better readability.

  • Original tab name "Configuration" is renamed as "Job Configuration" to avoid confusion.

The Job Manager Configuration page is now identical to this UI below under Runtime Web UI. 

REST API Changes

We propose to introduce REST APIs for environmental information at both the cluster level and the job level. Besides the existing /jobmanager/config, we introduce 3 REST APIs:

  • /jobmanager/environment
  • /jobs/:jobid/jobmanager/environment
  • /jobs/:jobid/jobmanager/config

Both the cluster level and the job level environment (config) API will return the same result at the moment. In the future, they may return the different things. The response of /jobs/:jobid/jobmanager/config will be same as /jobmanager/config, while the response of /jobmanager/environment and /jobs/:jobid/jobmanager/environment is shown below:

    "environment": {
{         {"key": "FLINK_HOME", "value": "/foo/bar/flink"},         {"key": "JAVA_HOME", "value": "/foo/bar/java"},         ...     },     "jvm": {         "version": "OpenJDK 64-Bit Server VM 1.8.0_252-b09",         "arch": "amd64",         "options": [             "-Xmx1073741824",             "-Xms1073741824",             ...         ]     },     "classpaths": [         "/foo/bar/dependencies",         ...     ] }

Logs

Currently, Flink history server does not support viewing logs. This is mostly because logs are usually too large to be included as part of the job archives.

We believe most users who use Flink in production have their own mechanisms for collecting logs from terminated Flink instances, which is usually highly depended on the company internal infrastructures. Therefore, instead of building another log collecting mechanism and expecting users to migrate onto it, we decide to provide a method for users to easily integrate their existing log viewing service into the history server.

We propose to introduce a configuration option "historyserver.log.[jobmanager|taskmanager].url-pattern" that allows users to define a URL pattern that points to an external log viewing service. Flink history server will replace special placeholders (<jobid><tmid>) in this pattern with proper values to generate actual URLs.

For the JobManager, this API will return the log URL of the existing JobManager and TaskManager.

WebUI Preview

Job status information is rearranged into a better structure, with an additional "actions" section which contains a navigation button to the job manager log. 

  • Under history server, that redirects to an external job manager log url.

Job vertex's subtask and taskManager in the History Server now support navigating to external TM log. The action button is disabled when the http request is loading or failed.

As for Runtime Web UI, the action remains unchanged where it redirect to taskManager log page.

REST API Changes

We propose to introduce "/jobs/:jobid/jobmanager/log-url" and "/jobs/:jobid/taskmanagers/:taskmanagerid/log-url", which will return the generated url of the log of the given job and jobmanager / taskmanager. The response is shown below:

{
  "url": "http://localhost:8081/task-manager/taskmanager-1/logs"
}

Compatibility, Deprecation, and Migration Plan

Changes proposed in this FLIP are backward compatible. There's no deprecation, and no migration needed.

Test Plan

  • Backend changes will be covered by unit tests.

  • Frontend changes will be manually verified.

Rejected Alternatives

We have rejected two alternatives for identifying JM / TM in history server REST API.

  • Using a cluster-id instead of job-id in the URLs. This is rejected because of the complexity on the user side having to understand the additional concept. Instead, the mapping from job-id to the specific cluster / JM / TM can be handled by the backend.
  • Using different URLs for the same API. To be specific, prefixing the JM / TM related APIs with "/jobs/:jobid/" for history server. This is rejected because it breaks the simplicity that history server rest apis are a subset of jobmanager rest apis. Moreover, the current approach is more flexible if in future we want to return different things for job-wise / cluster-wise JM / TM information.