Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state[Under Discussion]

...

Page properties


Discussion thread
Vote threadhttps://

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14143

16050

Release


Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

According to the docs, there may exist more than one attempt in a subtask, but there is no way to get the attempt history list in the REST API, users have no way to know if the subtask has failed before.

Image Modified

In the timeline page, the Web UI can only get and visualize the latest execution attempt of a subtask timeline, there is no way to get a failed attempt timeline in the current REST API. 

Proposed Changes

Add the attempt history

  • We can add the attempt history under the subtasks drawer on the job vertex page.
  • Display all attempt timeline.

Frontend Design

Add view attempt history menu in the subtask, when popup an attempts history modal when user clicking it.

Image Added

When users click the vertex timeline, display all the subtask attempts timeline with subtaskId-host-attemptId.

Image Added

Image RemovedImage Added

REST API Design

   

  • ArchivedExecutionVertex adds method to return prior executions.
  • get prior execution attempt

...

AccessExecution execution = executionVertex.getCurrentExecutionAttempt();

int currentAttemptNum = execution.getAttemptNumber();

JobID jobID = request.getPathParameter(JobIDPathParameter.class);

JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);

List<SubtaskExecutionAttemptDetailsInfo> allAttempts = new ArrayList<>();

allAttempts.add(SubtaskExecutionAttemptDetailsInfo.create(execution, metricFetcher, jobID, jobVertexID));

if (currentAttemptNum > 0) {

  for (int i = currentAttemptNum - 1; i >= 0; i--) {

    AccessExecution currentExecution = executionVertex.getPriorExecutionAttempt(i);

    if (currentExecution != null) {

      allAttempts.add(SubtaskExecutionAttemptDetailsInfo.create(currentExecution, metricFetcher, jobID, jobVertexID));

    }

  }

}

  • by ArchivedExecutionVertex.getPriorExecutionAttempts()
  • add add SubtaskAllExecutionAttemptsDetailsHandler for failed attempt
  • url /jobs/:jobid/vertices/:vertexid/subtasks/:subtaskIndex/attempts
  • response:

{
   "type" : "object",

  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskAllExecutionAttemptsDetailsInfo",

  "properties" : {

  

      "attempts" : {

    

         "type" : "array",

    

        "items" : {

      

           "type" : "object",

      

           "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtaskExecutionAttemptDetailsInfo",

      

              "properties" : {

        

                  "subtask" : {

          

                      "type" : "integer"

        

                    },

        

                  "status" : {

          

                      "type" : "string",

          

                      "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]

        

                   },

        

                "attempt" : {

          

                    "type" : "integer"

        

                },

        

               "host" : {

          

                    "type" : "string"

        

                },

        

                "start-time" : {

          

                     "type" : "integer"

        

                },

        

                "end-time" : {

          

                    "type" : "integer"

        

               },

        

              "duration" : {

          

                  "type" : "integer"

        

               },

        

               "metrics" : {

          

                    "type" : "object",

          

                   "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",

          

                   "properties" : {

            

                       "read-bytes" : {

              

                            "type" : "integer"

            

                       },

            

                      "read-bytes-complete" : {

              

                          "type" : "boolean"

            

                      },

            

                      "write-bytes" : {

              

                           "type" : "integer"

            

                      },

            

                      "write-bytes-complete" : {

              

                           "type" : "boolean"

            

                     },

            

                     "read-records" : {

              

                          "type" : "integer"

            

                      },

            

                      "read-records-complete" : {

              

                           "type" : "boolean"

            

                      },

            

                      "write-records" : {

              

                          "type" : "integer"

            

                      },

            

                     "write-records-complete" : {

              

                          "type" : "boolean"

            }

          }

        }

      }

    }

  }

}

  • add Collection<SubtaskTimeInfo> in SubtasksTimesHandler result, which comes from ExecutionVertex’s prior executions.
  • url: /jobs/:jobid/vertices/:vertexid/subtasktimes
  • response

                     }

                 }

             },

             "taskmanager-id" : {
                 

{

  "type" : "object",

  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:SubtasksTimesInfo",

  "properties" : {

    "id" : {

      

"type" : "string"

    

             },

    "name

            "start_time" : {

      

                 "type" : "

string"

integer"
              }
           }
        }
     }
   }
}

  • In the 'subtasks' array we have objects of type SubtaskTimeInfo with only one added field 'attempt'.
  • add query parameter show-history, default value is false. If show-history is true, information for all attempts including
    previous ones will be returned
  • url: /jobs/:jobid/vertices/:vertexid/subtasktimes?show-history=true
  • response:


{

   

    },

    "now" : {

      "type" : "integer"

    },

    "subtasks" : {

      "type" : "array",

      "items" : {

        

"type" : "object",

        

   "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:SubtasksTimesInfo

:SubtaskTimeInfo

",

        

   "properties" : {

          "subtask

       "id" : {

            

       "type" : "

integer

string"

          

     },

          

     "

host

name" : {

            

         "type" : "string"

          

      },

          

      "

duration

now" : {

            

          "type" : "integer"

          

      },

          

      "

timestamps

subtasks" : {

            "type" : "object",

            "additionalProperties" : {

              "type" : "integer"

            }

          },

          "attempt-num": {

            "type" : "integer"

          },

          "attempts-time-info": {

            "type": "array",

          "type" : "array",

         

            

"items" : {

              

             "type" : "object",

              

             "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:

SubtaskAttemptTimeInfo

SubtasksTimesInfo:SubtaskTimeInfo",

              

             "properties" : {

                

                "subtask" : {

                  

                    "type" : "integer"

                

                 },

                

                 "host" : {

                  

                     "type" : "string"

                

                  },

                

                  "duration" : {

                  

                      "type" : "integer"

                

                    },

                

                    "timestamps" : {

                  

                       "type" : "object",

                  

                       "additionalProperties" : {

                    

                           "type" : "integer"

                  }

                },

                "attempt-num": {

                        }

                    },

                    "attempt": {

                         

                  

"type" : "integer"

                },

              }

            }

          } 

                    }

         

        

}

      }

    }

  }

}



Test Plan

Everything can be tested with unit tests.