TODOs:
- Add Document Scope
- Complete works descriptions
- Add what happens in case of:
- Node heartbeat loss aka node failure
- Double registration (logical node failure and re-registration)
- Create JobSpecification
- Call IHyracksClientConnection.startJob
- Create JobSpecificationActivityClusterGraphGeneratorFactory
- Create StartJobFunction and send it to the Cluster Controller ClientInterfaceIPCI which runs in the network thread
- Create JobId
- Schedule JobStartWork to run on the Cluster Controller work queue
[JobStartWork]
- Created when StartJobFunction is submitted.
- JobStartWork creates a job run and pass it to the Cluster Controller Job Manager
- Job Manager either decides to queue or execute the job immediately based on the capacity of the cluster and the job requirements
- Queue:
- Set status to PENDING
- Add the job to the job queue
- Execute:
- Put the job in the active run map.
- Notify listeners of <<job creation>>
- Set status to RUNNING
- Start job through executor
- Start runnable activity clusters
- If no more task clusters and no more ongoing clusters, schedule JobCleanupWork with terminated successfully!?
- Get runnable clusters and order tasks
- Mark runnable tasks as in progress
- Start tasks:
- For each node in runnable task clusters, send start tasks requests (Note: this can fail at any time)
- Notify listeners of <<job start>>
- Start runnable activity clusters
- Any failure during job start, we set job status to failure with the exception in starting the job, then call abortJob
- Queue:
- Return the JobId to the user
- If all goes well, we wait for events, Otherwise, we abort the started job through
- Get all task clusters marked as ongoing
- For each node in each cluster, send abort, log and ignore on failure
- Remove task cluster from in-progress
- Remove partitions and partition requests for the tasks
- Set task cluster status as failed or aborted
- Ensure in progress task clusters is empty
- Schedule JobCleanupWork
Q. What kinds of events can we get at the cluster controller and how do we handle each?
A. All events come first to one of the IPCIs
- ClientInterfaceIPCI
- CANCEL_JOB:
- Submitted when a user cancels a job or it times out
- Create CancelJobWork and schedule it through the Cluster Controller work queue
- WAIT_FOR_COMPLETION:
- Submitted if the user wants immediate results
- Create WaitForJobCompletionWork and schedule it through the Cluster Controller work queue
- CANCEL_JOB:
- ClusterControllerIPCI
- NOTIFY_JOBLET_CLEANUP:
- Sent from a Node Controller when:
- At the end of CleanupJobletWork
- At the end of NotifyTaskCompleteWork
- At the end of NotifyTaskFailureWork [only if creation of the task was successful]
- Create JobletCleanupNotificationWork and schedule it through the Cluster Controller work queue
- Sent from a Node Controller when:
- NOTIFY_TASK_COMPLETE
- Sent from NotifyTaskCompleteWork at a node controller [before sending NOTIFY_JOBLET_CLEANUP]
- Create TaskCompleteWork and schedule it through the Cluster Controller work queue
- NOTIFY_TASK_FAILURE
- Sent from NotifyTaskFailureWork at a node controller [before sending NOTIFY_JOBLET_CLEANUP]
- Create TaskFailureWork and schedule it through the Cluster Controller work queue
- NOTIFY_JOBLET_CLEANUP:
Q. What kinds of events can we get at a node controller and how do we handle each?
- START_TASKS
- Comes from the cluster controller as part of:
- Start Job
- Notify task complete
- Notify task failure?
- Notify node failures?
- Create StartTasksWork and schedule it through the Node Controller work queue
- Comes from the cluster controller as part of:
- ABORT_TASKS
- Comes from the CC when a job needs to be aborted
- Create AbortTasksWork and schedule it through the Node Controller work queue
- CLEANUP_JOBLET
- Comes from the CC when a joblet needs to be cleaned up
- Create CleanupJobletWork and schedule it through the Node Controller work queue
The CC works:
[CancelJobWork]
Created when CancelJobFunction is submitted
- Calls jobManager.cancel
- If the job has started already, we get executor and call cancelJob
- If the job has already failed, or terminated, we do nothing !?
- Else, abort ongoing task clusters [sending abort tasks to ncs]... Note: those can arrive before tasks completion or after
- Then abort job which again abort task clusters and then schedule JobCleanupWork
- Else, we remove it from the job queue, set its status to failure because of cancellation and archive it.
[WaitForJobCompletionWork]
- Created when WaitForCompletionFunction is submitted
- Get JobRun from JobManager
- If found, in an executor thread, wait for its completion (while not terminated nor failed, wait)
- Else, get from history, set value, then set exception. This is bad because we should always set either but not both???
[JobCleanupWork]
- Created when:
- JobExecutor.abortJob which is called from:
- CancelJobWork
- JobStartWork
- RegisterNodeWork
- RemoveDeadNodesWork
- TaskFailureWork
- JobExecutor.startRunnableActivityClusters which is called from:
- RegisterNodeWork
- RemoveDeadNodesWork
- TaskCompleteWork
- TaskFailureWork
- JobStartWork
- JobExecutor.abortJob which is called from:
- Get the JobRun
- PrepareComplete()
If the status is FAILURE_BEFORE_EXECUTION, we do finalComplete
- Else if the status has been set before, we only call finalComplete if all nodes have completed cleanup
- Else: Set the job status and for all nodes that are still part of the system, call cleanUpJoblet
- If all nodes have left, then call finalComplete
- If the job has no target nodes, call finalComplete
- finalComplete notifies <<job finish>>
[JobletCleanupNotificationWork]
- Created when a NotifyJobletCleanupFunction is submitted to the cluster controller
NotifyJobletCleanupFunction is submitted:
At the end of CleanupJobletWork
- At the end of NotifyTaskCompleteWork
- At the end of NotifyTaskFailureWork
- Removes the node from the list of pendingCleanupNodes
- Removes the job from the list of active jobs on the node
- If the list of nodes pending cleanup is empty, we call jobManager.finalComplete
[TaskCompleteWork]
- Created when a NotifyTaskCompleteFunction is submitted to the cluster controller
- NotifyTaskCompleteFunction is submitted as part of NotifyTaskCompleteWork
- Possibly Updates stats
Call jobExecutor.notifyTaskComplete.
If the number of pending tasks is 0:
Set task cluster status to COMPLETED
- Remove the task cluster from in progress task clusters
Start the next activity cluster
- In certain cases, we log:
"Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt
"Spurious task complete notification: " + taId + " Current state = " + taStatus
[TaskFailureWork]
- Created when aNotifyTaskFailureFunction is submitted to the cluster controller
- NotifyTaskFailureFunction is submitted as part of NotifyTaskFailureWork
- Report job failure to dataset directory service
- Get job executor and notify task failure
- Set attempt status as FAILED
- Abort task
- Abort other doomed tasks
- If reached max re-attempts, abort job
- Else, startRunnableActivityClusters
- Any failure that happens during task failure work, we abort job
The NC works:
[StartTasksWork]
Created when a StartTasksFunction is submitted to a node controller and run on the work queue
Create set of tasks and start them. If starting any tasks fails, we submit a NotifyTaskFailureWork to the node controller queue, There could be tasks that were started successfully when that happens
[AbortTasksWork]
Created when an AbortTasksFunction is submitted to a node controller and run on the work queue
- Gets the DatasetPartitionManager and aborts the reader for the job
- Get the joblet (if still running) and abort all of its tasks through thread interrupts
[CleanupJobletWork]
Created when a CleanupJobletFunction is submitted to a node controller and run on the work queue
- CleanupJobletFunction is created in JobManager.prepareComplete
- Get the Node Partition Manager and unregister partitions of the job
- On an executor thread, deallocate partitions
- Remove the joblet from the joblet map
- Call joblet.cleanup
[NotifyTaskCompleteWork]
- Created as the last step in a task that ran successfully and scheduled to run on the node controller work queue
- Notify Cluster Controller of task completion (log on failure)
- Removes the task from the joblet
[NotifyTaskFailureWork]
- Created in:
- Start of a task if the task was aborted before the thread is registered.
- Last step in a failling task
- Last step if failed to create or start a task in StartTasksWork
- Gets the Node Partition Manager and abort reader for the job
- Notify Cluster Controller of task failure (log on failure)
- Removes the task from the joblet