TODOs:

  • Add Document Scope
  • Complete works descriptions
  • Add what happens in case of:
    • Node heartbeat loss aka node failure
    • Double registration (logical node failure and re-registration)

  1. Create JobSpecification
  2. Call IHyracksClientConnection.startJob
  3. Create JobSpecificationActivityClusterGraphGeneratorFactory
  4. Create StartJobFunction and send it to the Cluster Controller ClientInterfaceIPCI which runs in the network thread
  5. Create JobId
  6. Schedule JobStartWork to run on the Cluster Controller work queue

[JobStartWork]

  • Created when StartJobFunction is submitted.
  • JobStartWork creates a job run and pass it to the Cluster Controller Job Manager
  • Job Manager either decides to queue or execute the job immediately based on the capacity of the cluster and the job requirements
    • Queue:
      • Set status to PENDING
      • Add the job to the job queue
    • Execute:
      • Put the job in the active run map.
      • Notify listeners of <<job creation>>
      • Set status to RUNNING
      • Start job through executor
        • Start runnable activity clusters
          • If no more task clusters and no more ongoing clusters, schedule JobCleanupWork with terminated successfully!?
          • Get runnable clusters and order tasks
          • Mark runnable tasks as in progress
          • Start tasks:
            • For each node in runnable task clusters, send start tasks requests (Note: this can fail at any time)
        • Notify listeners of <<job start>>
      • Any failure during job start, we set job status to failure with the exception in starting the job, then call abortJob
  • Return the JobId to the user
  • If all goes well, we wait for events, Otherwise, we abort the started job through
    • Get all task clusters marked as ongoing
    • For each node in each cluster, send abort, log and ignore on failure
    • Remove task cluster from in-progress
    • Remove partitions and partition requests for the tasks
    • Set task cluster status as failed or aborted
    • Ensure in progress task clusters is empty
    • Schedule JobCleanupWork


Q. What kinds of events can we get at the cluster controller and how do we handle each?

A. All events come first to one of the IPCIs

  • ClientInterfaceIPCI
    • CANCEL_JOB:
      • Submitted when a user cancels a job or it times out
      • Create CancelJobWork and schedule it through the Cluster Controller work queue
    • WAIT_FOR_COMPLETION:
      • Submitted if the user wants immediate results
      • Create WaitForJobCompletionWork and schedule it through the Cluster Controller work queue
  • ClusterControllerIPCI
    • NOTIFY_JOBLET_CLEANUP:
      • Sent from a Node Controller when:
        • At the end of CleanupJobletWork
        • At the end of NotifyTaskCompleteWork
        • At the end of NotifyTaskFailureWork [only if creation of the task was successful]
      • Create JobletCleanupNotificationWork and schedule it through the Cluster Controller work queue
    • NOTIFY_TASK_COMPLETE
      • Sent from NotifyTaskCompleteWork at a node controller [before sending NOTIFY_JOBLET_CLEANUP]
      • Create TaskCompleteWork and schedule it through the Cluster Controller work queue
    • NOTIFY_TASK_FAILURE
      • Sent from NotifyTaskFailureWork at a node controller [before sending NOTIFY_JOBLET_CLEANUP]
      • Create TaskFailureWork and schedule it through the Cluster Controller work queue


Q. What kinds of events can we get at a node controller and how do we handle each?

  • START_TASKS
    • Comes from the cluster controller as part of:
      • Start Job
      • Notify task complete
      • Notify task failure?
      • Notify node failures?
    • Create StartTasksWork and schedule it through the Node Controller work queue
  • ABORT_TASKS
    • Comes from the CC when a job needs to be aborted
    • Create AbortTasksWork and schedule it through the Node Controller work queue
  • CLEANUP_JOBLET
    • Comes from the CC when a joblet needs to be cleaned up
    • Create CleanupJobletWork and schedule it through the Node Controller work queue

 

The CC works:

[CancelJobWork]

  • Created when CancelJobFunction is submitted

  • Calls jobManager.cancel
  • If the job has started already, we get executor and call cancelJob
    • If the job has already failed, or terminated, we do nothing !?
    • Else, abort ongoing task clusters [sending abort tasks to ncs]... Note: those can arrive before tasks completion or after
    • Then abort job which again abort task clusters and then schedule JobCleanupWork
  • Else, we remove it from the job queue, set its status to failure because of cancellation and archive it.

[WaitForJobCompletionWork]

  • Created when WaitForCompletionFunction is submitted
  • Get JobRun from JobManager
  • If found, in an executor thread, wait for its completion (while not terminated nor failed, wait)
  • Else, get from history, set value, then set exception. This is bad because we should always set either but not both???

[JobCleanupWork]

  • Created when:
    • JobExecutor.abortJob which is called from:
      • CancelJobWork
      • JobStartWork
      • RegisterNodeWork
      • RemoveDeadNodesWork
      • TaskFailureWork
    • JobExecutor.startRunnableActivityClusters which is called from:
      • RegisterNodeWork
      • RemoveDeadNodesWork
      • TaskCompleteWork
      • TaskFailureWork
      • JobStartWork
  • Get the JobRun
  • PrepareComplete()
    • If the status is FAILURE_BEFORE_EXECUTION, we do finalComplete

    • Else if the status has been set before, we only call finalComplete if all nodes have completed cleanup
    • Else: Set the job status and for all nodes that are still part of the system, call cleanUpJoblet
    • If all nodes have left, then call finalComplete
    • If the job has no target nodes, call finalComplete
    • finalComplete notifies <<job finish>>


[JobletCleanupNotificationWork]

  • Created when a NotifyJobletCleanupFunction is submitted to the cluster controller
  • NotifyJobletCleanupFunction is submitted:

    • At the end of CleanupJobletWork

    • At the end of NotifyTaskCompleteWork
    • At the end of NotifyTaskFailureWork
  • Removes the node from the list of pendingCleanupNodes
  • Removes the job from the list of active jobs on the node
  • If the list of nodes pending cleanup is empty, we call jobManager.finalComplete

[TaskCompleteWork]

  • Created when a NotifyTaskCompleteFunction is submitted to the cluster controller
  • NotifyTaskCompleteFunction is submitted as part of NotifyTaskCompleteWork
  • Possibly Updates stats
  • Call jobExecutor.notifyTaskComplete. 

    • If the number of pending tasks is 0:

    • Set task cluster status to COMPLETED

    • Remove the task cluster from in progress task clusters
    • Start the next activity cluster

  • In certain cases, we log:
    • "Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt

    • "Spurious task complete notification: " + taId + " Current state = " + taStatus

[TaskFailureWork]

  • Created when aNotifyTaskFailureFunction is submitted to the cluster controller
  • NotifyTaskFailureFunction is submitted as part of NotifyTaskFailureWork
  • Report job failure to dataset directory service
  • Get job executor and notify task failure
    • Set attempt status as FAILED
    • Abort task
    • Abort other doomed tasks
    • If reached max re-attempts, abort job
    • Else, startRunnableActivityClusters
    • Any failure that happens during task failure work, we abort job

The NC works:

[StartTasksWork]

  • Created when a StartTasksFunction is submitted to a node controller and run on the work queue

  • Create set of tasks and start them. If starting any tasks fails, we submit a NotifyTaskFailureWork to the node controller queue, There could be tasks that were started successfully when that happens

[AbortTasksWork]

  • Created when an AbortTasksFunction is submitted to a node controller and run on the work queue

  • Gets the DatasetPartitionManager and aborts the reader for the job
  • Get the joblet (if still running) and abort all of its tasks through thread interrupts

[CleanupJobletWork]

  • Created when a CleanupJobletFunction is submitted to a node controller and run on the work queue

  • CleanupJobletFunction is created in JobManager.prepareComplete
  • Get the Node Partition Manager and unregister partitions of the job
  • On an executor thread, deallocate partitions
  • Remove the joblet from the joblet map
  • Call joblet.cleanup

[NotifyTaskCompleteWork]

  • Created as the last step in a task that ran successfully and scheduled to run on the node controller work queue
  • Notify Cluster Controller of task completion (log on failure)
  • Removes the task from the joblet

[NotifyTaskFailureWork]

  • Created in:
    • Start of a task if the task was aborted before the thread is registered.
    • Last step in a failling task
    • Last step if failed to create or start a task in StartTasksWork
  • Gets the Node Partition Manager and abort reader for the job
  • Notify Cluster Controller of task failure (log on failure)
  • Removes the task from the joblet


  • No labels