You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Under Discussion

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink currently supports a job submission through its /jars/:jarid/run API call. While processing the request, Flink runs the main method in a previously submitted JAR. This method may contain operations that take arbitrarily long to complete or genuine issues that will never generate a job. In those cases, clients will not be able to cancel any ongoing job submissions and will block (until a timeout is reached) before realizing that no progress is being made.

Therefore, we propose a new set of API calls to support an asynchronous job submission: creation, listing, completion tracking and deletion of job submissions. It uncouples the passive wait as well as extends control over job submissions.

Some of design is similar to what Flink currently uses for savepoint submissions, in an attempt to reuse the existing setup for long running operations.

Public Interfaces

Request Handlers

The current synchronous job submission API should not be changed due to backwards compatibility. Therefore, the only expected change to the existing classes should be done on WebSubmissionExtension to register the handlers needed for the asynchronous job submission.

WebSubmissionExtension
package org.apache.flink.runtime.webmonitor;

	public class WebSubmissionExtension implements WebMonitorExtension {
		WebSubmissionExtension(Configuration configuration,
			GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
			Map<String, String> responseHeaders,
			CompletableFuture<String> localAddressFuture,
			Path jarDir, Executor executor, Time timeout,
			Supplier<ApplicationRunner> applicationRunnerSupplier)
			throws Exception {
				final SubmitJarRunAsyncHandler submitJarRunAsyncHandler = new SubmitJarRunAsyncHandler(...);
				final ListJarRunAsyncHandler listJarRunAsyncHandler = new ListJarRunAsyncHandler(...);
				final StatusJarRunAsyncHandler statusJarRunAsyncHandler = new StatusJarRunAsyncHandler(...);
				final DeleteJarRunAsyncHandler deleteJarRunAsyncHandler = new DeleteJarRunAsyncHandler(...);

				webSubmissionHandlers
					.add(Tuple2.of(SubmitJarRunAsyncHeaders.getInstance(), submitJarRunAsyncHandler));
				webSubmissionHandlers
					.add(Tuple2.of(StatusJarRunAsyncHeaders.getInstance(), statusJarRunAsyncHandler));
				webSubmissionHandlers
					.add(Tuple2.of(ListJarRunAsyncHeaders.getInstance(), listJarRunAsyncHandler));
				webSubmissionHandlers
					.add(Tuple2.of(DeleteJarRunAsyncHeaders.getInstance(), deleteJarRunAsyncHandler));
  }
}

Without accounting for the headers of the new API calls, there should be an addition to the existing JarRunRequestBody to include an triggerId and a jarId. The triggerId should be a unique value generated by the client and used by Flink to identify retries of a request (similar to an idempotency token). jarId refers to a previously uploaded JAR. This is shown in JarRunAsyncRequestBody below:

JarRunAsyncRequestBody
package org.apache.flink.runtime.webmonitor.handlers;

public class JarRunAsyncRequestBody extends JarRunRequestBody {
	public JarRunAsyncRequestBody(String entryClassName, String programArguments,
		List<String> programArgumentsList, Integer parallelism, JobID jobId,
		Boolean allowNonRestoredState, String savepointPath,
		RestoreMode restoreMode, String triggerId, String jarId);

	public String getTriggerId();

	public String getJarId();
}

Just like what happens with SavepointHandlers, JarRunAsyncHandlers will be grouped under a single class and specialize the JarRunAsyncHandlerBase class.

JarRunAsyncHandlers
package org.apache.flink.runtime.webmonitor.handlers;

public class JarRunAsyncHandlers {
	private abstract static class JarRunAsyncHandlerBase { ... }

	public class SubmitJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }

	public class ListJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }

	public class StatusJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }

	public class DeleteJarRunAsyncHandler extends JarRunAsyncHandlerBase { ... }
}

REST API Contracts

The following are the REST API contracts for the submission, listing, tracking and deletion of the asynchronous operations, respectively.

/run-async

  • Verb: POST
  • Response code: 200 OK
  • Description: Submits a job by running either a JAR previously uploaded via /jar/upload or a JAR being uploaded alongside the request (this JAR is not kept after the latter request finishes). Program arguments can be passed both via the JSON request (recommended) or query parameters. It differs from /jars/:jarid/run as the job is submitted in the background without blocking the client. If there is an ongoing run request with the same triggerId and remaining run configuration, the second request will be ignored.
  • Query parameters:
    • allowNonRestoredState (optional): Boolean value that specifies whether the job submission should be rejected if the savepoint contains state that cannot be mapped back to the job.
    • entry-class (optional): String value that specifies the fully qualified name of the entry point class. Overrides the class defined in the JAR file manifest.
    • triggerId (optional): 64 ASCII characters long string value that specifies the idempotency token.
    • jarId (optional): String value that identifies a JAR. When uploading the JAR a path is returned, where the filename is the ID. This value is equivalent to the id field in the list of uploaded jars (/jars). If this parameter is set when a JAR is provided together with the request, the request will fail.
    • parallelism (optional): Positive integer value that specifies the desired parallelism for the job.
    • programArg (optional): Comma-separated list of program arguments.
    • savepointPath (optional): String value that specifies the path of the savepoint to restore the job from.
  • Request:
{
	"type" : "object",
	"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunAsyncRequestBody",
	"properties" : {
		"allowNonRestoredState" : {
		"type" : "boolean"
		},
		"triggerId" : {
			"type" : "string"
		},
		"entryClass" : {
			"type" : "string"
		},
		"jarId" : {
			"type" : "string"
		},
		"jobId" : {
			"type" : "any"
		},
		"parallelism" : {
			"type" : "integer"
		},
		"programArgs" : {
			"type" : "string"
		},
		"programArgsList" : {
			"type" : "array",
			"items" : {
				"type" : "string"
			}
		},
		"restoreMode" : {
			"type" : "string",
			"enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
		},
		"savepointPath" : {
			"type" : "string"
		}
	}
}
  • Response:
{
	"type" : "object",
	"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
	"properties" : {
		"request-id" : {
		"type" : "any"
		}
	}
}

/run-async

  • Verb: GET
  • Response code: 200 OK
  • Description: Returns an overview over all run requests and their current status.
  • Request:
{}
  • Response:
{
	"type" : "array",
	"items" : {
		"type" : "object",
		"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:RunAsyncStatus",
		"properties" : {
			"request-id" : {
				"type" : "any"
			},
			"status" : {
				"type" : "object",
				"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
				"properties" : {
					"id" : {
						"type" : "string",
						"required" : true,
						"enum" : [ "IN_PROGRESS", "COMPLETED" ]
					}
				}
			}
		}
	}
}

/run-async/:triggerid

  • Verb: GET
  • Response code: 200 OK
  • Description: Returns the completion status of the job creation.
  • Path parameters:
    • triggerid - String value that identifies an asynchronous JAR run request. When making an asynchronous JAR run request, an ID is assigned to the request and returned to the client.
  • Request:
{}
  • Response:
{
	"type" : "object",
	"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:AsynchronousOperationResult",
	"properties" : {
		"operation" : {
			"type" : "object",
			"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunResponseBody",
			"properties" : {
				"failure-cause" : {
					"type" : "any"
				},
				"jobid" : {
					"type" : "any"
				}
			}
		},
		"status" : {
			"type" : "object",
			"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
			"properties" : {
				"id" : {
					"type" : "string",
					"required" : true,
					"enum" : [ "IN_PROGRESS", "COMPLETED" ]
				}
			}
		}
	}
}

/run-async/:triggerid

  • Verb: DELETE
  • Response code: 200 OK
  • Description: Deletes a run request. If a run request is ongoing, it will be cancelled as part of the deletion.
  • Path parameters:
    • triggerid - String value that identifies an asynchronous JAR run request. When making an asynchronous JAR run request, an ID is assigned to the request and returned to the client.
  • Request:
{}
  • Response:
{}

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels