IDIEP-118
Author
Sponsor
Created17.01.2024
Status

IN PROGRESS


Motivation

Currently, AI3 has a simple mechanism for launching Compute jobs. The Compute job has the following interface

/**
 * A Compute job that may be executed on a single Ignite node, on several nodes, or on the entire cluster.
 *
 * @param <R> Job result type.
 */
public interface ComputeJob<R> {
    /**
     * Executes the job on an Ignite node.
     *
     * @param context  The execution context.
     * @param args     Job arguments.
     * @return Job result.
     */
    R execute(JobExecutionContext context, Object... args);
}


The only way to use it currently is Code Deployment API. So, user should deploy some deployment units and after that via public compute API they can execute job for example via following method 


/**
 * Executes a {@link ComputeJob} of the given class on a single node from a set of candidate nodes.
 *
 * @param nodes    Candidate nodes; the job will be executed on one of them.
 * @param units    Deployment units. Can be empty.
 * @param jobClassName Name of the job class to execute.
 * @param args     Arguments of the job.
 * @param <R>      Job result type
 * @return CompletableFuture Job result.
 */
<R> CompletableFuture<R> executeAsync(
        Set<ClusterNode> nodes,
        List<DeploymentUnit> units,
        String jobClassName,
        Object... args
);


As we see at present, the compute API is quite poor and we need to add some management component to it, which will allow us to manage the mechanism for executing jobs, and not just their definition.

Description

Requirements

Queues and Priority

It is necessary to add a queuing mechanism to execute compute jobs. Now we do not have any order in the execution of jobs, this process is not synchronized in any way and the execution processes are always in a race. 

The queue must meet the following requirements

  1. The basic mode must satisfy the FIFO mechanism.
  2. The queue must provide a dynamic modification mechanism. This means that the user can somehow advance his job down or up the queue on demand.
  3. The queue size must be configurable.
  4. The queue should accumulate only if the executing node currently does not have free resources to execute job. In our case, now these are only worker threads of the thread pool.

Failover

There are no special requirements for the failover mechanism. The user should be able to save intermediate job’s state in the cache.

Cancellation

Any job can be canceled. If the job is in the queue, then it is necessary to guarantee that it will not go into the execution state, but if the job is already at the execution stage, everything necessary must be done to cancel it. Naturally, we cannot give any strict guarantees here, because such settings depend on the user’s environment, whether thread interruptions are enabled or not.

Job state

It is proposed to introduce the concept of job state. Using this state, users will be able to gain insight into the current state of the system.

It is proposed to enter 7 states:

  1. Submitted - a state that shows that the job was correctly created and accepted by the cluster. Initialization state.
  2. Queued - a state indicating that the job is in the waiting queue for execution.
  3. Executing - a state indicating the job is in execution.
  4. Completed - a state indicating that the job was executed successfully and the execution result was returned.
  5. Failed - a state indicating that a job was unexpectedly terminated during execution and its restart was unsuccessful for one reason or another (restart not supported or several restart failed in row).
  6. Canceling – Job thread is interrupted and the cancel flag is raised. 
  7. Canceled – Job thread is not alive.


Let us now describe the process of transition between states. First, let's show the transition graph between states like this:

Necessary and sufficient conditions for transition between states:

  1. Submitted → Queued
    Once a job has been created, adding it to the queue is a trigger for entering the queue state.
  2. Queued → Executing
    Necessary to extract jobs from the queue. After this, a sufficient condition for the transition is the creation of an executor for this job. The need to download the necessary deployment units is not a limitation for moving to the execution state.
  3. Executing → Completed

    A job is completed only when the executor for this job is freed and can be used to complete the next job. The result of the job must also be returned to the user. If a job is called using a thin client, the process of transferring the result to the client should not in any way affect the process of changing the state of the job.

  4. Executing → Queued
    A repeatable job is failed and the repeatable counter is less than max repeat.

  5. Executing → Failed
    A job is failed and this is not a repeatable job or max repeat counter overflow.

  6. Submitted → Canceled
    A job was canceled before being added to the queue.

  7. Queued → Canceled
    A job was canceled when it was in queue.
  8. Executing → Canceling
    Cancel command accepted and job start to cancel.
  9. Canceling → Canceled
    Job successfully canceled. It means that the job thread is not alive anymore. (Thread interrupted or cooperative cancel). 
  10. Canceling → Completed
    It means that the job thread was interrupted but it didn't help and the job finished well. 
  11. Canceling → Failed
    It means that the job thread was interrupted but it didn't help and the job failed.

Priority

TL;DR: 

This is PriorityQueue with [Int.MIN_VALUE,Int.MAX_VALUE] priority values. Jobs with the same priority have a FIFO resolving strategy.

First, let's formally define what priority is. The priority of a compute job is a number in int format that indicates in what order the job should be completed. The priority of a job is specified at the time it is called, and can be changed until the job begins its execution.

A job X is called higher priority than Y if priority of X is greater than priority of Y.

A job X is called equal priority than Y if priority of X is equal to priority of Y.

A job X is called lower priority than Y if priority of X is less than priority of Y.

Queue

It’s clear that if our cluster and nodes have a large enough amount of resources to perform an infinite number of user jobs, then we don’t need prioritization at all, but in reality, hardware resources are often limited. Therefore, sooner or later a situation will occur when there are more user jobs than resources to complete them. For this case, it is proposed to introduce the concept of a jobs queue. 

A job queue is a structure showing upcoming Compute jobs for each node. From this definition it follows that each node will have its own queue and no mechanism for sharing information about queues is yet expected. 

The standard queue interface is almost enough for us, with the exception of the priority adding method. When adding jobs to the queue normally, we will follow the FIFO strategy, i.e. the default priority value when adding a job to the queue is 0.

Failover

Let's first describe the current flow of how the client interacts in the context of working with a Compute API. Now there is the concept of a coordinator node, this is the node to which the request to execute Сompute job was sent. After the node has received a request to execute compute jobs, it will select a node on which the job will be executed (or several nodes in the case of a broadcast call), we call this node a worker node. In some cases, the coordinator node and the worker node may be the same node. So, let's describe different cases when client, worker node, coordinator node shutdown.

Worker node shutdown

In this case, the job execution stops and you need to restart it. In this case, the coordinator will see that the worker node has turned off and all the tasks that this coordinator sent for execution must be redistributed to other nodes. In this context, it does not matter what state the tasks were in, in the queue or in the process of execution, we do not offer a safepoint mechanism, and all states of jobs can be written to the cache, so when a task is launched on another worker node, it will be able to read from the cache the state that the job I wrote it down last time.

Coordinator node shutdown

In the case when the coordinator node is turned off, the execution of the job will not stop (except for the case where the worker node and the coordinator node coincide), but in this case the client loses the opportunity to receive the result of the job execution.

Client disconnect

The current architecture implies that if the connection with the client is broken, the execution of the job must be canceled, since there is no way to get the result of the job execution.

See Feature Improvements.

Job repeatable

Repeatable mechanism provides the ability to restart a job if it fails as a result of an unhandled exception being thrown during execution. Repeatable config should be flexible and contain follow properties: 

  1. Max retries counter
  2. Exception type filter (Feature improvements)
  3. Priority change (Feature improvements)


Repeatable config should be defined on a job execution call and sent with an execution request. 

As shown in the diagram above, if a repeatable task fails with some unhandled exception, it is re-added to the queue with the same priority.

Cancellation

As stated above, we must provide a mechanism for canceling jobs. Users can cancel jobs that have one of statuses SUBMITTED, QUEUED, EXECUTING

  1. A job with status SUBMITTED should not be added to the execution queue.
  2. A job in the QUEUED status after calling cancel should never change to the EXECUTING status and should immediately be removed from the execution queue. 
  3. A job with status EXECUTING should be interrupted but this is not possible in all cases. 

Cooperative cancellation

It is proposed to introduce a mechanism for cooperative cancellation. This means adding properties to the parameters of the job execution method, with the help of which the user can periodically check the job cancellation status in the job code itself. Thus, this will allow users to write more flexible code. However, it is worth noting that we will not depart from Java standards in this place and everything will also interrupt the job execution flow. That is, in a sense, this parameter will duplicate the mechanism Thread.currentThread().isInterrupted(); however, it will do this more explicitly. Thus, we will achieve the effects that any standard IO operation on a Compute job stream will be interrupted at the moment the job is canceled and the user will be able to receive an exception.

Job ID

To introduce a new management API, we need to introduce such a concept as job ID. Using this ID, the user will be able to identify the job and perform the necessary operations on it, such as: changing the priority for the task, canceling the task, viewing its current status.

Here we have several problems. The first is the need for uniqueness for these IDs. The second is the issue of their automatic generation. Obviously, the user does not always want to think about generating some kind of ID, especially a unique one.

It seems that we can generate this ID on the client side when calling a Compute job, and if we use UUID as an ID, then the problem with collisions is solved by itself. 

In order to return this generated ID to the client, we want to use the notification mechanism in the client protocol. 

Here we should say that the current state of thin client protocol will be changed. But this is not a big change, because we have a design for notifications in client protocol and can use it. 

New API

Execution API changes

Current state 

org.apache.ignite.compute.IgniteCompute

<R> CompletableFuture<R> executeAsync(
        Set<ClusterNode> nodes,
        List<DeploymentUnit> units,
        String jobClassName,
        Object... args
);

New API

org.apache.ignite.compute.IgniteCompute
<R> JobExecution<R> executeAsync(
        JobExecutionOptions options,
        Set<ClusterNode> nodes,
        List<DeploymentUnit> units,
        String jobClassName,
        Object... args
);


public interface JobExecutionOptions {
    long priority();
    short maxRetries();
}


public enum JobState {
    SUBMITTED, QUEUED, EXECUTING, FAILED, COMPLETED,CANCELING, CANCELED
}


public class JobStatus {
    private final UUID id;
    private JobState state;
    private String ownership;
    private Instant createTime;
    private Instant startTime;
    private Instant finishTime;
}


public interface JobExecution<T> {
    CompletableStage<T> resultAsync();
    CompletableStage<JobStatus> statusAsync();
    default CompletableStage<UUID> idAsync() {
        return status().thenApply(status -> status.id());
    }
    CompletableStage<Boolean> cancelAsync();
    CompletionStage<Boolean> changePriority(long newPriority);
}

Management API

REST API

POST /management/v1/compute/priority

GET /management/v1/compute/jobs

DELETE /management/v1/compute/cancel

Tickets

[IGNITE-20838] Compute: queues, priority, failover, cancellation - ASF JIRA (apache.org)

  • No labels