Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink currently supports a job submission through its /jars/:jarid/run API call. While processing the request, Flink runs the main method in a previously submitted JAR. This method may contain operations that take arbitrarily long to complete or genuine issues that will never generate a job. In those cases, clients will not be able to cancel any ongoing job submissions and will block (until a timeout is reached) before realizing that no progress is being made.

Therefore, we propose a new set of API calls to support an asynchronous job submission: creation, listing, completion tracking and deletion of job submissions. It uncouples the passive wait as well as extends control over job submissions.

Some of design is similar to what Flink currently uses for savepoint submissions, in an attempt to reuse the existing setup for long running operations.

Public Interfaces

Request Handlers

The current synchronous job submission API should not be changed due to backwards compatibility. Therefore, the only expected change to the existing classes should be done on WebSubmissionExtension to register the handlers needed for the asynchronous job submission.

WebSubmissionExtension
package org.apache.flink.runtime.webmonitor;

public class WebSubmissionExtension implements WebMonitorExtension {
	WebSubmissionExtension(Configuration configuration,
		GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
		Map<String, String> responseHeaders, CompletableFuture<String> localAddressFuture,
		Path jarDir, Executor executor, Time timeout, Supplier<ApplicationRunner> applicationRunnerSupplier)
		throws Exception {
			final SubmitJarRunAsyncHandler submitJarRunAsyncHandler = new SubmitJarRunAsyncHandler(...);
			final ListJarRunAsyncHandler listJarRunAsyncHandler = new ListJarRunAsyncHandler(...);
			final StatusJarRunAsyncHandler statusJarRunAsyncHandler = new StatusJarRunAsyncHandler(...);
			final DeleteJarRunAsyncHandler deleteJarRunAsyncHandler = new DeleteJarRunAsyncHandler(...);

			webSubmissionHandlers
				.add(Tuple2.of(SubmitJarRunAsyncHeaders.getInstance(), submitJarRunAsyncHandler));
			webSubmissionHandlers
				.add(Tuple2.of(StatusJarRunAsyncHeaders.getInstance(), statusJarRunAsyncHandler));
			webSubmissionHandlers
				.add(Tuple2.of(ListJarRunAsyncHeaders.getInstance(), listJarRunAsyncHandler));
			webSubmissionHandlers
				.add(Tuple2.of(DeleteJarRunAsyncHeaders.getInstance(), deleteJarRunAsyncHandler));
	}
}

Without accounting for the headers of the new API calls, there should be an addition to the existing JarRunRequestBody to include an triggerId and a jarId. The triggerId should be a unique value generated by the client and used by Flink to identify retries of a request (similar to an idempotency token). jarId refers to a previously uploaded JAR. This is shown in JarRunAsyncRequestBody below:

JarRunAsyncRequestBody
package org.apache.flink.runtime.webmonitor.handlers;

public class JarRunAsyncRequestBody extends JarRunRequestBody {
	public JarRunAsyncRequestBody(String entryClassName, String programArguments,
		List<String> programArgumentsList, Integer parallelism, JobID jobId,
		Boolean allowNonRestoredState, String savepointPath,
		RestoreMode restoreMode, String triggerId, String jarId);

	public String getTriggerId();

	public String getJarId();
}

Just like what happens with SavepointHandlers, JarRunAsyncHandlers will be grouped under a single class and specialize the JarRunAsyncHandlerBase class.

JarRunAsyncHandlers
package org.apache.flink.runtime.webmonitor.handlers;

public class JarRunAsyncHandlers {
	private abstract static class JarRunAsyncHandlerBase { ... }

	public class SubmitJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }

	public class ListJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }

	public class StatusJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }

	public class DeleteJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }
}

REST API Contracts

The following are the REST API contracts for the submission, listing, tracking and deletion of the asynchronous operations, respectively.

/run-async

  • Verb: POST
  • Response code: 200 OK
  • Description: Submits a job by running either a JAR previously uploaded via /jar/upload or a JAR being uploaded alongside the request (this JAR is not kept after the latter request finishes). Program arguments can be passed both via the JSON request (recommended) or query parameters. It differs from /jars/:jarid/run as the job is submitted in the background without blocking the client. If there is an ongoing run request with the same triggerId and remaining run configuration, the second request will be ignored.
  • Query parameters:
    • allowNonRestoredState (optional): Boolean value that specifies whether the job submission should be rejected if the savepoint contains state that cannot be mapped back to the job.
    • entry-class (optional): String value that specifies the fully qualified name of the entry point class. Overrides the class defined in the JAR file manifest.
    • triggerId (optional): 64 ASCII characters long string value that specifies the idempotency token.
    • jarId (optional): String value that identifies a JAR. When uploading the JAR a path is returned, where the filename is the ID. This value is equivalent to the id field in the list of uploaded jars (/jars). If this parameter is set when a JAR is provided together with the request, the request will fail.
    • parallelism (optional): Positive integer value that specifies the desired parallelism for the job.
    • programArg (optional): Comma-separated list of program arguments.
    • savepointPath (optional): String value that specifies the path of the savepoint to restore the job from.
  • Request:
{
	"type" : "object",
	"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunAsyncRequestBody",
	"properties" : {
		"allowNonRestoredState" : {
			"type" : "boolean"
		},
		"triggerId" : {
			"type" : "string"
		},
		"entryClass" : {
			"type" : "string"
		},
		"jarId" : {
			"type" : "string"
		},
		"jobId" : {
			"type" : "any"
		},
		"parallelism" : {
			"type" : "integer"
		},
		"programArgs" : {
			"type" : "string"
		},
		"programArgsList" : {
			"type" : "array",
			"items" : {
				"type" : "string"
			}
		},
		"restoreMode" : {
			"type" : "string",
			"enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
		},
		"savepointPath" : {
			"type" : "string"
		}
	}
}
  • Response:
{
	"type" : "object",
	"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
	"properties" : {
		"request-id" : {
			"type" : "any"
		}
	}
}

/run-async

  • Verb: GET
  • Response code: 200 OK
  • Description: Returns an overview over all run requests and their current status.
  • Request:
{}
  • Response:
{
	"type" : "array",
	"items" : {
		"type" : "object",
		"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:RunAsyncStatus",
		"properties" : {
			"request-id" : {
				"type" : "any"
			},
			"status" : {
				"type" : "object",
				"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
				"properties" : {
					"id" : {
						"type" : "string",
						"required" : true,
						"enum" : [ "IN_PROGRESS", "COMPLETED" ]
					}
				}
			}
		}
	}
}

/run-async/:triggerid

  • Verb: GET
  • Response code: 200 OK
  • Description: Returns the completion status of the job creation.
  • Path parameters:
    • triggerid - String value that identifies an asynchronous JAR run request. When making an asynchronous JAR run request, an ID is assigned to the request and returned to the client.
  • Request:
{}
  • Response:
{
	"type" : "object",
	"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:AsynchronousOperationResult",
	"properties" : {
		"operation" : {
			"type" : "object",
			"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunResponseBody",
			"properties" : {
				"failure-cause" : {
					"type" : "any"
				},
				"jobid" : {
					"type" : "any"
				}
			}
		},
		"status" : {
			"type" : "object",
			"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
			"properties" : {
				"id" : {
					"type" : "string",
					"required" : true,
					"enum" : [ "IN_PROGRESS", "COMPLETED" ]
				}
			}
		}
	}
}

/run-async/:triggerid

  • Verb: DELETE
  • Response code: 200 OK
  • Description: Deletes a run request. If a run request is ongoing, it will be cancelled as part of the deletion.
  • Path parameters:
    • triggerid - String value that identifies an asynchronous JAR run request. When making an asynchronous JAR run request, an ID is assigned to the request and returned to the client.
  • Request:
{}
  • Response:
{}

Proposed Changes

One update we need to make is register the handlers for the asynchronous job submission API calls in the WebSubmissionExtension class.

To develop the proposed feature, we plan to reuse components of two existing features: job and savepoint submissions. Nearly all background processing of job submissions should follow the existing approach. However, the asynchronous workflow is based on the savepoint submissions.

As a way to fulfill separate use cases, clients need to choose between the use of a previously uploaded JAR (through JarId) or provide the JAR in the request itself. The first should start the processing of the run request quicker, due to the transfer of the JAR not being part of the execution path. The update of a running application could follow this optimization strategy as the incurred downtime would be independent of how long the JAR takes to upload. However, clients who are starting their application for the first time can now make a single API call with all the information needed to start an application.

Given the asynchronous nature of the design, clients should call /run-async/:triggerid to inquire about the state of a previously submitted run request. This is essential to understand if the run request completed successfully or if there was an issue that needs to be addressed. Issues can be of various types, for instance: the request is lacking information (e.g. neither a JarId nor a JAR was uploaded with the request), the provided JarId does not exist, the code in the JAR is faulty or the job manager failed (maybe due to something unrelated to the run request) and it is unable to complete the run request. Clients are responsible for retrying the requests with the issues addressed.

Compatibility, Deprecation, and Migration Plan

Existing users will not be impacted by the change, because the synchronous job submission will be unaffected. However, they will have the choice between simplicity and higher level of control over their job submissions. This is why there are no plans to phase out / remove the synchronous job submission.

Test Plan

We plan to start by adapting the existing tests for the synchronous job submission.

For the positive test cases, the new workflow consists of a job submission followed by polling of the completion tracking call for the associated triggerId. Assertions over the correctness of the running job should be identical to the synchronous submission, given that the workflows would have been merged at this stage. Failures raised due to problems in the provided inputs come from the job submission call. Failures that occur during the processing of a request are identified through the completion tracking calls.

To complete the testing of the job submission call, we plan to focus on the upload of a JAR alongside the request and the use of a triggerId for idempotency purposes. On the former, the priority is to guarantee that the JAR’s lifetime is limited to the duration of the job submission request (i.e. the JAR is disposed after the job creation finishes either successfully or due to an error). For the latter, we need to guarantee that duplicate requests are processed only once.

The remaining tests spread across the other three API calls. We intend to exercise both the completion tracking and listing API calls to externally assess the state of the system. Therefore, assertions over the reported state allow us to understand if the feature is behaving has expected. Regarding the job submission deletion, the main concerns come from the deletion of completed versus ongoing job submissions. A deletion of a completed job submission removes the tracking information for the requested triggerId. For ongoing job submissions, there are additional concerns to halt the background computation and delete the JAR that was uploaded alongside the asynchronous job submission.