Discussion threadhttps://lists.apache.org/thread/18z5078qklkgfsdrrrcc85js3t6xsy9m
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.10

Please keep the discussion on the mailing list rather than commenting on the wiki.

Motivation

In Flink the entrypoint for a user to enter the Flink world is the ExecutionEnvironment and the StreamExecutionEnvironment, depending on the job being batch or streaming.

These environments are responsible for:

  1. keeping the different configurations
  2. keeping track of the user-defined transformations
  3. creating the Plan/StreamGraph that corresponds to the job
  4. submitting the job to the cluster 


Focusing on the latter, this has led to each different execution target having its own flavour of an environment resulting in the number of environments being:

number_of_Environments = number_of_APIs × execution_target + ε

number_of_APIs ∈ [batch, streaming] and 

execution_target ∈ [local, remote, collection, cli/context]

ε = [optimizedPlan, previewPlan]

Notice that in the above, there is no explicit mention to Yarn, K8s, Mesos, etc.

The above situation has led to problems for Flink developers and developers of downstream projects that use Flink, e.g. Beam, Zeppelin Notebooks. For Flink devs, this situation has led to code duplication, as every environment has to have its own execute() logic, and maintainability problems due to bad separation of concerns (the Environments are responsible for many things). For downstream projects this has led to people having to jump through hoops in order to be able to re-use Flink’s execution logic.

Proposal

In this document, we propose to abstract away from the Environments the job submission logic and put it in a newly introduced Executor. This will allow each API to have a single Environment which, based on the provided configuration, will decide which executor to use, e.g. Yarn, Local, etc. In addition, it will allow different APIs and downstream projects to re-use the provided executors, thus limiting the amount of code duplication and the amount of code that has to be written.

Executors

This document proposes the introduction of the Executor interface. This interface will abstract the execution/submission logic from the environments. The executor will take as argument an abstract representation of the user’s job and a configuration object containing the parameters that are relevant for that specific execution of that specific job and, for now, return a JobExecutionResult, as shown below:


public interface Executor {

  JobExecutionResult execute(Pipeline pipeline, Configuration executionConfig) throws Exception;

}


This signature is similar to that of the execute() method of the ExecutionEnvironment and StreamExecutionEnvironment with the difference that it takes a Pipeline as an argument. This is not coincidental.

We want to introduce the Pipeline interface as a common interface for the current Plan (batch) and StreamGraph (streaming) data structures. A Pipeline can be translated to a JobGraph, which we need for execution, using a PipelineTranslationUtil. This will decouple Pipeline translation from the executors and will allow the same Executor, e.g. the YarnExecutor, to be reused by the different APIs (batch and streaming) without any modifications. 

The responsibility of the  Executor is to abstract the target execution settings (Yarn, Mesos, Kubernetes, etc) away from the environment and be agnostic to what is being executed there. This implies that there is going to be one executor per execution target.

Executor Creation and Discovery

Executors will be created by stateless ExecutorFactories which will have to implement the following interface:


public interface ExecutorFactory {

  boolean isCompatibleWith(Configuration configuration);

  Executor getExecutor(Configuration configuration);

}


Flink will use Service Discovery to discover the available factories. Then, based on the provided Configuration, it will decide which executor to use. This configuration object will contain all the relevant parameters needed to instantiate the required Executor.

In terms of code, the above means that, for example, the execute() method of the ExecutionEnvironment will become a concrete method with the following body:


public JobExecutionResult execute(String jobName) throws Exception {

  checkState(executorConfig != null);

  ExecutorFactory executorFactory = ExecutorFactoryService              (1)

        .getExecutorFactory(executorConfig)

        .orElseThrow(() ->

              new IllegalStateException("No executor found."));

  

  Plan plan = createProgramPlan(jobName);

  Pipeline pipeline = createProgramPipeline(plan);                      (2)

  Executor executor = executorFactory.getExecutor(executorConfig);      (3)

  this.lastJobExecutionResult = executor.execute(pipeline);             (4)

  return lastJobExecutionResult;

}


In (1) Flink goes through all the ExecutorFactories found and based on the executorConfig, which can be provided at the constructor, it finds the one that is compatible with the configuration options included. For this, the ExecutorFactory.isCompatibleWith(config) is used and we are proposing to introduce a configuration parameter named executor.target to determine the right executor. Note that there must be exactly one compatible executor. In any other case, Flink will throw an exception.

In (2) Flink creates the pipeline to be executed from the created Plan, and in (3) and (4), it gets an executor and passes the pipeline for further execution.

Prerequisites  

In order for the executors to work, there are two things that have to be done as part of this effort. These are:

  1. Introduce the Pipeline, the common interface of a user job for batch and streaming, along with a PipelineTranslationUtil for translating a Pipeline to a JobGraph. The main challenge here is to by-pass a not so collaborating module structure.
  2. Create the configuration based on which Flink will decide which Executor to use. For this, we need to be able to map all the relevant parameters that the user can specify by any means, e.g. the CLI, to valid configuration parameters. This means that the Configuration will be the single point of truth for the configuration parameters, and the CLI will become a thin layer which will translate the command line to the corresponding configuration parameters.

Rejected Alternatives

For the ExecutorFactory we were considering an alternative interface where the isCompatibleWith(Configuration config) is replaced by the String getExecutorName(). The downside with the approach we advocate here is that the user has to provide the configuration twice, once in the isCompatibleWith() and once in the getExecutor(). On the flip-side, the plus is that we have more flexibility to change the "matching" logic between configuration parameters and executors in the future, as this is hidden from the user. For example, in a first version we may want to have one executor for session-cluster deployment, but in the future we may want to split it in yarn-session, standalone-session, etc.

User-Facing Implications

So far, the above changes introduce no user-facing changes. But with the transition to a configuration-based execution process, where all possible execution parameters will have matching configuration parameters, one can imagine that now the main() of a user’s program can include everything a user needs to specify in order to run his job on any target environment. To make this more explicit, we are also planning to add a method getExecutionEnvironment(Configuration config) in the available Environments.

Compatibility, Deprecation, and Migration Plan

Executors do not break any existing behaviour, so there are no compatibility concerns.