Status

Discussion thread
Vote thread
JIRA
Release
ReasonDropped in favor of FLIP-73/FLIP-74 which offer more general solutions for this.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation and background

ExecutionModes

The bin/flink command supports a couple of execution options, along two basic axes:

For session mode, there needs to be a pre-existing cluster that the client connects to. The choice of attached or detached is up to the client: it either waits for job completion or doesn't.

For the combination detached-per-job, the client will spin up a per-job YARN cluster that is brought up with the JobGraph ready that only executes that JobGraph. For attached-per-job mode, the client will currently start a YARN session cluster and then use the same code path that is used for session mode. The reason for this is historical: before client/cluster communication was via REST, there was a real attached mode where the client was connected via Akka to the server and the server would notice if the client failed. (I think). Now, there is no real attached mode anymore.

We should use a real per-job cluster also for the attached-per-job case but that does have some implications in the functionality of bin/flink, see cluster lifecycle and public interfaces.

JobGraph retrieval

For executing a user program, we need to retrieve a JobGraph from the main() method and hand that off to a client to trigger execution. There are two ways we do this currently: 1. hijacking execute() to throw an exception and 2. injecting a client into the ContextEnvironment.

This is a rough sketch of what the first method does:

userProgram = ... // something that has a main() method
environment = JobGraphExtractingEnv() // this throws the JobGraph in an exception when execute() is called

jobGraph = null
try {
  userProgram(environment)
} catch(e) {
  jobGraph = e.jobGraph
}

clusterDescriptor = getClusterDescriptor()
ClusterClient ignored = clusterClient.deployJobCluster(jobGraph)

return DummyJobExecutionResult(jobGraph.getJobID())

This has the downside that there can only be one execute() call per user program.

The second method does this:

userProgram = ... // something that has a main() method
clusterClient = createClusterClient()
environment = ContextEnvironment(clusterClient) // this calls clusterClient.submit() in the execute() method
userProgram(environment)
return environment.getLastJobResult()

The first method is used for detached-per-job mode, the second method is used for all other modes. We should remove that distinction and allow multiple execute() calls in the per-job mode that would spin up multiple independent per-job clusters.

Cluster Lifecycle 

In detached-per-job mode, the cluster lifecycle is tied to the job lifecycle: if the job finishes (successfully or otherwise) the cluster shuts down.

In the current (simulated) attached-per-job mode, the cluster lifecycle is not tied to the job lifecycle. It is the responsibility of the client to shut down the cluster. If the client fails for some reason the cluster will be kept running.

There is an internal attached-per-job mode for YARN. However, this is not used for running attached-per-job jobs via bin/flink. When in this mode, the per-job cluster does not immediately shut down after a job finishes but waits for a client to retrieve the job result. Only after the job result is retrieved does the cluster shut itself down. This gives the client a chance to "attach" to the cluster and wait for the job result. If the client fails and the job result is not retrieved, the cluster will be kept running. When using this mode of job submission, the client also doesn't get any feedback from the job submission, as it gets when submitting a job to an existing YARN session. The only way of retrieving feedback is by looking at the YARN logs.

Value

The value for users is that they get what they ask for: a real per-job cluster for per-job mode. Currently, the session cluster that is used for attached-per-job mode allows submitting jobs from other sources and can be left running if the client fails. This will now also leave the correct final application status in YARN (see  Unable to render Jira issues macro, execution error. ).

Additionally, users would be able to send off multiple distinct jobs in detached-per-job mode.

What is in scope

We want to change how the client does job submission for attached-per-job mode: a proper per-job cluster should be used. In addition, we want to allow multiple job submissions in detached-per-job mode, where each submission would spin up a separate per-job cluster. This mostly comes as a side effect of getting rid of JobGraph retrieval mode 1. mentioned above.

What is not in scope

We don't want to change how the internal cluster submission/communication works. Especially, we don't want to introduce a proper attached mode.

Public Interfaces 

This does not change public interfaces, what will change, however, is the feedback that a user gets when running bin/flink in attached mode. Failures during job submission will no longer be reported back. See cluster lifecycle.

Design

We propose two changes: 1. don't use the "hijack execute and throw an exception" code path anymore but instead have an execute() method that does the per-job cluster deployment 2. also use that new code path for detached-per-job-mode.

In pseudo-code, it would roughly look like this:

userProgram = ... // something that has a main() method
clusterDescriptor = ... // this describes the resource manager we're connecting to, for example YARN
environment = JobDeployingEnv(clusterDescriptor)

userProgram(environment)
return environment.getLastJobResult()

Where execute() would look like this:

function execute(jobGraph) {
  clusterClient = clusterDescriptor.deployPerJobCluster(jobGraph)
  if (isDetached() {
    return DummyJobExecutionResult(jobGraph.getJobID())
  } else {
    // This is problematic because the cluster might not be ready yet, so we have to retry.
    // Or the cluster might never be ready because something failed during job submission.
    return clusterClient.waitForJobResult(jobGraph.getJobID())
  }
}

Compatibility, Deprecation, and Migration Plan

As mentioned above, the only change for users is that they don't get feedback from attached-per-job mode job submission. There is no deprecation or migration needed.

Test Plan

The changes should be covered by existing tests.

Rejected Alternatives

An alternative to this complete FLIP is to keep the simulated attached-per-job mode, because it has the nice property that errors that happen during job submission are reported back.

A possible solution for the downside of not getting job submission feedback is for the client to talk directly to the cluster manager (YARN, in this case) to figure out the state of job submission and also to have a side-channel for getting back job results.