Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
titleWebSubmissionExtension
package org.apache.flink.runtime.webmonitor;

	public class WebSubmissionExtension implements WebMonitorExtension {
		WebSubmissionExtension(Configuration configuration,
			GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
			Map<String, String> responseHeaders,
			 CompletableFuture<String> localAddressFuture,
			Path jarDir, Executor executor, Time timeout,
			 Supplier<ApplicationRunner> applicationRunnerSupplier)
			throws Exception {
				final SubmitJarRunAsyncHandler submitJarRunAsyncHandler = new SubmitJarRunAsyncHandler(...);
				final ListJarRunAsyncHandler listJarRunAsyncHandler = new ListJarRunAsyncHandler(...);
				final StatusJarRunAsyncHandler statusJarRunAsyncHandler = new StatusJarRunAsyncHandler(...);
				final DeleteJarRunAsyncHandler deleteJarRunAsyncHandler = new DeleteJarRunAsyncHandler(...);

				webSubmissionHandlers
					.add(Tuple2.of(SubmitJarRunAsyncHeaders.getInstance(), submitJarRunAsyncHandler));
				webSubmissionHandlers
					.add(Tuple2.of(StatusJarRunAsyncHeaders.getInstance(), statusJarRunAsyncHandler));
				webSubmissionHandlers
					.add(Tuple2.of(ListJarRunAsyncHeaders.getInstance(), listJarRunAsyncHandler));
				webSubmissionHandlers
					.add(Tuple2.of(DeleteJarRunAsyncHeaders.getInstance(), deleteJarRunAsyncHandler));
  	}
}

Without accounting for the headers of the new API calls, there should be an addition to the existing JarRunRequestBody to include an triggerId and a jarId. The triggerId should be a unique value generated by the client and used by Flink to identify retries of a request (similar to an idempotency token). jarId refers to a previously uploaded JAR. This is shown in JarRunAsyncRequestBody below:

...

Code Block
{
	"type" : "object",
	"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunAsyncRequestBody",
	"properties" : {
		"allowNonRestoredState" : {
			"type" : "boolean"
		},
		"triggerId" : {
			"type" : "string"
		},
		"entryClass" : {
			"type" : "string"
		},
		"jarId" : {
			"type" : "string"
		},
		"jobId" : {
			"type" : "any"
		},
		"parallelism" : {
			"type" : "integer"
		},
		"programArgs" : {
			"type" : "string"
		},
		"programArgsList" : {
			"type" : "array",
			"items" : {
				"type" : "string"
			}
		},
		"restoreMode" : {
			"type" : "string",
			"enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
		},
		"savepointPath" : {
			"type" : "string"
		}
	}
}

...

Code Block
{
	"type" : "object",
	"id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
	"properties" : {
		"request-id" : {
			"type" : "any"
		}
	}
}

/run-async

...