Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:   [One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings

Flink currently supports a job submission through its /jars/:jarid/run API call. While processing the request, Flink runs the main method in a previously submitted JAR. This method may contain operations that take arbitrarily long to complete or genuine issues that will never generate a job. In those cases, clients will not be able to cancel any ongoing job submissions and will block (until a timeout is reached) before realizing that no progress is being made.

Therefore, we propose a new set of API calls to support an asynchronous job submission: creation, listing, completion tracking and deletion of job submissions. It uncouples the passive wait as well as extends control over job submissions.

Some of design is similar to what Flink currently uses for savepoint submissions, in an attempt to reuse the existing setup for long running operations.

Public Interfaces

Request Handlers

The current synchronous job submission API should not be changed due to backwards compatibility. Therefore, the only expected change to the existing classes should be done on WebSubmissionExtension to register the handlers needed for the asynchronous job submission.

Code Block
languagejava
titleWebSubmissionExtension
package org.apache.flink.runtime.webmonitor;

	public class WebSubmissionExtension implements WebMonitorExtension {
		WebSubmissionExtension(Configuration configuration,
			GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
			Map<String, String> responseHeaders,
			CompletableFuture<String> localAddressFuture,
			Path jarDir, Executor executor, Time timeout,
			Supplier<ApplicationRunner> applicationRunnerSupplier)
			throws Exception {
				final SubmitJarRunAsyncHandler submitJarRunAsyncHandler = new SubmitJarRunAsyncHandler(...);
				final ListJarRunAsyncHandler listJarRunAsyncHandler = new ListJarRunAsyncHandler(...);
				final StatusJarRunAsyncHandler statusJarRunAsyncHandler = new StatusJarRunAsyncHandler(...);
				final DeleteJarRunAsyncHandler deleteJarRunAsyncHandler = new DeleteJarRunAsyncHandler(...);

				webSubmissionHandlers
					.add(Tuple2.of(SubmitJarRunAsyncHeaders.getInstance(), submitJarRunAsyncHandler));
				webSubmissionHandlers
					.add(Tuple2.of(StatusJarRunAsyncHeaders.getInstance(), statusJarRunAsyncHandler));
				webSubmissionHandlers
					.add(Tuple2.of(ListJarRunAsyncHeaders.getInstance(), listJarRunAsyncHandler));
				webSubmissionHandlers
					.add(Tuple2.of(DeleteJarRunAsyncHeaders.getInstance(), deleteJarRunAsyncHandler));
  }
}

Without accounting for the headers of the new API calls, there should be an addition to the existing JarRunRequestBody to include an triggerId and a jarId. The triggerId should be a unique value generated by the client and used by Flink to identify retries of a request (similar to an idempotency token). jarId refers to a previously uploaded JAR. This is shown in JarRunAsyncRequestBody below:

Code Block
languagejava
titleJarRunAsyncRequestBody
package org.apache.flink.runtime.webmonitor.handlers;

public class JarRunAsyncRequestBody extends JarRunRequestBody {
	public JarRunAsyncRequestBody(String entryClassName, String programArguments,
		List<String> programArgumentsList, Integer parallelism, JobID jobId,
		Boolean allowNonRestoredState, String savepointPath,
		RestoreMode restoreMode, String triggerId, String jarId);

	public String getTriggerId();

	public String getJarId();
}

Just like what happens with SavepointHandlers, JarRunAsyncHandlers will be grouped under a single class and specialize the JarRunAsyncHandlerBase class.

Code Block
languagejava
titleJarRunAsyncHandlers
package org.apache.flink.runtime.webmonitor.handlers;

public class JarRunAsyncHandlers {
	private abstract static class JarRunAsyncHandlerBase { ... }

	public class SubmitJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }

	public class ListJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }

	public class StatusJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }

	public class DeleteJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }
}

REST API Contracts

The following are the REST API contracts for the submission, listing, tracking and deletion of the asynchronous operations, respectively.

/run-async

  • Verb: POST
  • Response code: 200 OK
  • Description: Submits a job by running either a JAR previously uploaded via /jar/upload or a JAR being uploaded alongside the request (this JAR is not kept after the latter request finishes). Program arguments can be passed both via the JSON request (recommended) or query parameters. It differs from /jars/:jarid/run as the job is submitted in the background without blocking the client. If there is an ongoing run request with the same triggerId and remaining run configuration, the second request will be ignored.
  • Query parameters:
    • allowNonRestoredState (optional): Boolean value that specifies whether the job submission should be rejected if the savepoint contains state that cannot be mapped back to the job.
    • entry-class (optional): String value that specifies the fully qualified name of the entry point class. Overrides the class defined in the JAR file manifest.
    • triggerId (optional): 64 ASCII characters long string value that specifies the idempotency token.
    • jarId (optional): String value that identifies a JAR. When uploading the JAR a path is returned, where the filename is the ID. This value is equivalent to the id field in the list of uploaded jars (/jars). If this parameter is set when a JAR is provided together with the request, the request will fail.
    • parallelism (optional): Positive integer value that specifies the desired parallelism for the job.
    • programArg (optional): Comma-separated list of program arguments.
    • savepointPath (optional): String value that specifies the path of the savepoint to restore the job from.
  • Request:
Code Block
{
	"type" : "object",
	"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunAsyncRequestBody",
	"properties" : {
		"allowNonRestoredState" : {
		"type" : "boolean"
		},
		"triggerId" : {
			"type" : "string"
		},
		"entryClass" : {
			"type" : "string"
		},
		"jarId" : {
			"type" : "string"
		},
		"jobId" : {
			"type" : "any"
		},
		"parallelism" : {
			"type" : "integer"
		},
		"programArgs" : {
			"type" : "string"
		},
		"programArgsList" : {
			"type" : "array",
			"items" : {
				"type" : "string"
			}
		},
		"restoreMode" : {
			"type" : "string",
			"enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
		},
		"savepointPath" : {
			"type" : "string"
		}
	}
}
  • Response:
Code Block
{
	"type" : "object",
	"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
	"properties" : {
		"request-id" : {
		"type" : "any"
		}
	}
}

/run-async

  • Verb: GET
  • Response code: 200 OK
  • Description: Returns an overview over all run requests and their current status.
  • Request:
Code Block
{}
  • Response:
Code Block
{
	"type" : "array",
	"items" : {
		"type" : "object",
		"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:RunAsyncStatus",
		"properties" : {
			"request-id" : {
				"type" : "any"
			},
			"status" : {
				"type" : "object",
				"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
				"properties" : {
					"id" : {
						"type" : "string",
						"required" : true,
						"enum" : [ "IN_PROGRESS", "COMPLETED" ]
					}
				}
			}
		}
	}
}

/run-async/:triggerid

  • Verb: GET
  • Response code: 200 OK
  • Description: Returns the completion status of the job creation.
  • Path parameters:
    • triggerid - String value that identifies an asynchronous JAR run request. When making an asynchronous JAR run request, an ID is assigned to the request and returned to the client.
  • Request:
Code Block
{}
  • Response:
Code Block
{
	"type" : "object",
	"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:AsynchronousOperationResult",
	"properties" : {
		"operation" : {
			"type" : "object",
			"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunResponseBody",
			"properties" : {
				"failure-cause" : {
					"type" : "any"
				},
				"jobid" : {
					"type" : "any"
				}
			}
		},
		"status" : {
			"type" : "object",
			"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
			"properties" : {
				"id" : {
					"type" : "string",
					"required" : true,
					"enum" : [ "IN_PROGRESS", "COMPLETED" ]
				}
			}
		}
	}
}

/run-async/:triggerid

  • Verb: DELETE
  • Response code: 200 OK
  • Description: Deletes a run request. If a run request is ongoing, it will be cancelled as part of the deletion.
  • Path parameters:
    • triggerid - String value that identifies an asynchronous JAR run request. When making an asynchronous JAR run request, an ID is assigned to the request and returned to the client.
  • Request:
Code Block
{}
  • Response:
Code Block
{}

...

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...