This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
- DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
- Classes marked with the @Public annotation
- On-disk binary formats, such as checkpoints/savepoints
- User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
- Configuration settings
Flink currently supports a job submission through its /jars/:jarid/run
API call. While processing the request, Flink runs the main
method in a previously submitted JAR. This method may contain operations that take arbitrarily long to complete or genuine issues that will never generate a job. In those cases, clients will not be able to cancel any ongoing job submissions and will block (until a timeout is reached) before realizing that no progress is being made.
Therefore, we propose a new set of API calls to support an asynchronous job submission: creation, listing, completion tracking and deletion of job submissions. It uncouples the passive wait as well as extends control over job submissions.
Some of design is similar to what Flink currently uses for savepoint submissions, in an attempt to reuse the existing setup for long running operations.
Public Interfaces
Request Handlers
The current synchronous job submission API should not be changed due to backwards compatibility. Therefore, the only expected change to the existing classes should be done on WebSubmissionExtension
to register the handlers needed for the asynchronous job submission.
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.runtime.webmonitor;
public class WebSubmissionExtension implements WebMonitorExtension {
WebSubmissionExtension(Configuration configuration,
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
Map<String, String> responseHeaders,
CompletableFuture<String> localAddressFuture,
Path jarDir, Executor executor, Time timeout,
Supplier<ApplicationRunner> applicationRunnerSupplier)
throws Exception {
final SubmitJarRunAsyncHandler submitJarRunAsyncHandler = new SubmitJarRunAsyncHandler(...);
final ListJarRunAsyncHandler listJarRunAsyncHandler = new ListJarRunAsyncHandler(...);
final StatusJarRunAsyncHandler statusJarRunAsyncHandler = new StatusJarRunAsyncHandler(...);
final DeleteJarRunAsyncHandler deleteJarRunAsyncHandler = new DeleteJarRunAsyncHandler(...);
webSubmissionHandlers
.add(Tuple2.of(SubmitJarRunAsyncHeaders.getInstance(), submitJarRunAsyncHandler));
webSubmissionHandlers
.add(Tuple2.of(StatusJarRunAsyncHeaders.getInstance(), statusJarRunAsyncHandler));
webSubmissionHandlers
.add(Tuple2.of(ListJarRunAsyncHeaders.getInstance(), listJarRunAsyncHandler));
webSubmissionHandlers
.add(Tuple2.of(DeleteJarRunAsyncHeaders.getInstance(), deleteJarRunAsyncHandler));
}
} |
Without accounting for the headers of the new API calls, there should be an addition to the existing JarRunRequestBody
to include an triggerId
and a jarId
. The triggerId
should be a unique value generated by the client and used by Flink to identify retries of a request (similar to an idempotency token). jarId
refers to a previously uploaded JAR. This is shown in JarRunAsyncRequestBody
below:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.runtime.webmonitor.handlers;
public class JarRunAsyncRequestBody extends JarRunRequestBody {
public JarRunAsyncRequestBody(String entryClassName, String programArguments,
List<String> programArgumentsList, Integer parallelism, JobID jobId,
Boolean allowNonRestoredState, String savepointPath,
RestoreMode restoreMode, String triggerId, String jarId);
public String getTriggerId();
public String getJarId();
} |
Just like what happens with SavepointHandlers
, JarRunAsyncHandlers
will be grouped under a single class and specialize the JarRunAsyncHandlerBase
class.
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.runtime.webmonitor.handlers;
public class JarRunAsyncHandlers {
private abstract static class JarRunAsyncHandlerBase { ... }
public class SubmitJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }
public class ListJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }
public class StatusJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }
public class DeleteJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }
} |
REST API Contracts
The following are the REST API contracts for the submission, listing, tracking and deletion of the asynchronous operations, respectively.
/run-async
- Verb:
POST
- Response code:
200 OK
- Description: Submits a job by running either a JAR previously uploaded via
/jar/upload
or a JAR being uploaded alongside the request (this JAR is not kept after the latter request finishes). Program arguments can be passed both via the JSON request (recommended) or query parameters. It differs from/jars/:jarid/run
as the job is submitted in the background without blocking the client. If there is an ongoing run request with the sametriggerId
and remaining run configuration, the second request will be ignored. - Query parameters:
allowNonRestoredState
(optional): Boolean value that specifies whether the job submission should be rejected if the savepoint contains state that cannot be mapped back to the job.entry-class
(optional): String value that specifies the fully qualified name of the entry point class. Overrides the class defined in the JAR file manifest.triggerId
(optional): 64 ASCII characters long string value that specifies the idempotency token.jarId
(optional): String value that identifies a JAR. When uploading the JAR a path is returned, where the filename is the ID. This value is equivalent to theid
field in the list of uploaded jars (/jars
). If this parameter is set when a JAR is provided together with the request, the request will fail.parallelism
(optional): Positive integer value that specifies the desired parallelism for the job.programArg
(optional): Comma-separated list of program arguments.savepointPath
(optional): String value that specifies the path of the savepoint to restore the job from.- Request:
Code Block |
---|
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunAsyncRequestBody",
"properties" : {
"allowNonRestoredState" : {
"type" : "boolean"
},
"triggerId" : {
"type" : "string"
},
"entryClass" : {
"type" : "string"
},
"jarId" : {
"type" : "string"
},
"jobId" : {
"type" : "any"
},
"parallelism" : {
"type" : "integer"
},
"programArgs" : {
"type" : "string"
},
"programArgsList" : {
"type" : "array",
"items" : {
"type" : "string"
}
},
"restoreMode" : {
"type" : "string",
"enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
},
"savepointPath" : {
"type" : "string"
}
}
} |
- Response:
Code Block |
---|
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
"properties" : {
"request-id" : {
"type" : "any"
}
}
} |
/run-async
- Verb:
GET
- Response code:
200 OK
- Description: Returns an overview over all run requests and their current status.
- Request:
Code Block |
---|
{} |
- Response:
Code Block |
---|
{
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:RunAsyncStatus",
"properties" : {
"request-id" : {
"type" : "any"
},
"status" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
"properties" : {
"id" : {
"type" : "string",
"required" : true,
"enum" : [ "IN_PROGRESS", "COMPLETED" ]
}
}
}
}
}
} |
/run-async/:triggerid
- Verb:
GET
- Response code:
200 OK
- Description: Returns the completion status of the job creation.
- Path parameters:
triggerid
- String value that identifies an asynchronous JAR run request. When making an asynchronous JAR run request, an ID is assigned to the request and returned to the client.- Request:
Code Block |
---|
{} |
- Response:
Code Block |
---|
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:AsynchronousOperationResult",
"properties" : {
"operation" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunResponseBody",
"properties" : {
"failure-cause" : {
"type" : "any"
},
"jobid" : {
"type" : "any"
}
}
},
"status" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
"properties" : {
"id" : {
"type" : "string",
"required" : true,
"enum" : [ "IN_PROGRESS", "COMPLETED" ]
}
}
}
}
} |
/run-async/:triggerid
- Verb:
DELETE
- Response code:
200 OK
- Description: Deletes a run request. If a run request is ongoing, it will be cancelled as part of the deletion.
- Path parameters:
triggerid
- String value that identifies an asynchronous JAR run request. When making an asynchronous JAR run request, an ID is assigned to the request and returned to the client.- Request:
Code Block |
---|
{} |
- Response:
Code Block |
---|
{} |
...
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
...