ID | IEP-118 |
Author | |
Sponsor | |
Created | 17.01.2024 |
Status | IN PROGRESS |
Currently, AI3 has a simple mechanism for launching Compute jobs. The Compute job has the following interface
/** * A Compute job that may be executed on a single Ignite node, on several nodes, or on the entire cluster. * * @param <R> Job result type. */ public interface ComputeJob<R> { /** * Executes the job on an Ignite node. * * @param context The execution context. * @param args Job arguments. * @return Job result. */ R execute(JobExecutionContext context, Object... args); }
The only way to use it currently is Code Deployment API. So, user should deploy some deployment units and after that via public compute API they can execute job for example via following method
/** * Executes a {@link ComputeJob} of the given class on a single node from a set of candidate nodes. * * @param nodes Candidate nodes; the job will be executed on one of them. * @param units Deployment units. Can be empty. * @param jobClassName Name of the job class to execute. * @param args Arguments of the job. * @param <R> Job result type * @return CompletableFuture Job result. */ <R> CompletableFuture<R> executeAsync( Set<ClusterNode> nodes, List<DeploymentUnit> units, String jobClassName, Object... args );
As we see at present, the compute API is quite poor and we need to add some management component to it, which will allow us to manage the mechanism for executing jobs, and not just their definition.
It is necessary to add a queuing mechanism to execute compute jobs. Now we do not have any order in the execution of jobs, this process is not synchronized in any way and the execution processes are always in a race.
The queue must meet the following requirements
There are no special requirements for the failover mechanism. The user should be able to save intermediate job’s state in the cache.
Any job can be canceled. If the job is in the queue, then it is necessary to guarantee that it will not go into the execution state, but if the job is already at the execution stage, everything necessary must be done to cancel it. Naturally, we cannot give any strict guarantees here, because such settings depend on the user’s environment, whether thread interruptions are enabled or not.
It is proposed to introduce the concept of job state. Using this state, users will be able to gain insight into the current state of the system.
It is proposed to enter 7 states:
Let us now describe the process of transition between states. First, let's show the transition graph between states like this:
Necessary and sufficient conditions for transition between states:
A job is completed only when the executor for this job is freed and can be used to complete the next job. The result of the job must also be returned to the user. If a job is called using a thin client, the process of transferring the result to the client should not in any way affect the process of changing the state of the job.
Executing → Queued
A repeatable job is failed and the repeatable counter is less than max repeat.
Executing → Failed
A job is failed and this is not a repeatable job or max repeat counter overflow.
Submitted → Canceled
A job was canceled before being added to the queue.
TL;DR:
This is PriorityQueue with [Int.MIN_VALUE,Int.MAX_VALUE] priority values. Jobs with the same priority have a FIFO resolving strategy.
First, let's formally define what priority is. The priority of a compute job is a number in int format that indicates in what order the job should be completed. The priority of a job is specified at the time it is called, and can be changed until the job begins its execution.
A job X is called higher priority than Y if priority of X is greater than priority of Y.
A job X is called equal priority than Y if priority of X is equal to priority of Y.
A job X is called lower priority than Y if priority of X is less than priority of Y.
It’s clear that if our cluster and nodes have a large enough amount of resources to perform an infinite number of user jobs, then we don’t need prioritization at all, but in reality, hardware resources are often limited. Therefore, sooner or later a situation will occur when there are more user jobs than resources to complete them. For this case, it is proposed to introduce the concept of a jobs queue.
A job queue is a structure showing upcoming Compute jobs for each node. From this definition it follows that each node will have its own queue and no mechanism for sharing information about queues is yet expected.
The standard queue interface is almost enough for us, with the exception of the priority adding method. When adding jobs to the queue normally, we will follow the FIFO strategy, i.e. the default priority value when adding a job to the queue is 0.
Let's first describe the current flow of how the client interacts in the context of working with a Compute API. Now there is the concept of a coordinator node, this is the node to which the request to execute Сompute job was sent. After the node has received a request to execute compute jobs, it will select a node on which the job will be executed (or several nodes in the case of a broadcast call), we call this node a worker node. In some cases, the coordinator node and the worker node may be the same node. So, let's describe different cases when client, worker node, coordinator node shutdown.
In this case, the job execution stops and you need to restart it. In this case, the coordinator will see that the worker node has turned off and all the tasks that this coordinator sent for execution must be redistributed to other nodes. In this context, it does not matter what state the tasks were in, in the queue or in the process of execution, we do not offer a safepoint mechanism, and all states of jobs can be written to the cache, so when a task is launched on another worker node, it will be able to read from the cache the state that the job I wrote it down last time.
In the case when the coordinator node is turned off, the execution of the job will not stop (except for the case where the worker node and the coordinator node coincide), but in this case the client loses the opportunity to receive the result of the job execution.
The current architecture implies that if the connection with the client is broken, the execution of the job must be canceled, since there is no way to get the result of the job execution.
See Feature Improvements.
Repeatable mechanism provides the ability to restart a job if it fails as a result of an unhandled exception being thrown during execution. Repeatable config should be flexible and contain follow properties:
Repeatable config should be defined on a job execution call and sent with an execution request.
As shown in the diagram above, if a repeatable task fails with some unhandled exception, it is re-added to the queue with the same priority.
As stated above, we must provide a mechanism for canceling jobs. Users can cancel jobs that have one of statuses SUBMITTED, QUEUED, EXECUTING.
It is proposed to introduce a mechanism for cooperative cancellation. This means adding properties to the parameters of the job execution method, with the help of which the user can periodically check the job cancellation status in the job code itself. Thus, this will allow users to write more flexible code. However, it is worth noting that we will not depart from Java standards in this place and everything will also interrupt the job execution flow. That is, in a sense, this parameter will duplicate the mechanism Thread.currentThread().isInterrupted(); however, it will do this more explicitly. Thus, we will achieve the effects that any standard IO operation on a Compute job stream will be interrupted at the moment the job is canceled and the user will be able to receive an exception.
To introduce a new management API, we need to introduce such a concept as job ID. Using this ID, the user will be able to identify the job and perform the necessary operations on it, such as: changing the priority for the task, canceling the task, viewing its current status.
Here we have several problems. The first is the need for uniqueness for these IDs. The second is the issue of their automatic generation. Obviously, the user does not always want to think about generating some kind of ID, especially a unique one.
It seems that we can generate this ID on the client side when calling a Compute job, and if we use UUID as an ID, then the problem with collisions is solved by itself.
In order to return this generated ID to the client, we want to use the notification mechanism in the client protocol.
Here we should say that the current state of thin client protocol will be changed. But this is not a big change, because we have a design for notifications in client protocol and can use it.
Current state
org.apache.ignite.compute.IgniteCompute <R> CompletableFuture<R> executeAsync( Set<ClusterNode> nodes, List<DeploymentUnit> units, String jobClassName, Object... args );
New API
org.apache.ignite.compute.IgniteCompute <R> JobExecution<R> executeAsync( JobExecutionOptions options, Set<ClusterNode> nodes, List<DeploymentUnit> units, String jobClassName, Object... args );
public interface JobExecutionOptions { long priority(); short maxRetries(); }
public enum JobState { SUBMITTED, QUEUED, EXECUTING, FAILED, COMPLETED,CANCELING, CANCELED }
public class JobStatus { private final UUID id; private JobState state; private String ownership; private Instant createTime; private Instant startTime; private Instant finishTime; }
public interface JobExecution<T> { CompletableStage<T> resultAsync(); CompletableStage<JobStatus> statusAsync(); default CompletableStage<UUID> idAsync() { return status().thenApply(status -> status.id()); } CompletableStage<Boolean> cancelAsync(); CompletionStage<Boolean> changePriority(long newPriority); }
POST /management/v1/compute/priority GET /management/v1/compute/jobs DELETE /management/v1/compute/cancel
[IGNITE-20838] Compute: queues, priority, failover, cancellation - ASF JIRA (apache.org)