Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As mentioned in the discussion of Interactive Programming, user applications might consist of multiple jobs and take long to finish. Currently, when Flink runs applications with multiple jobs, the application will run in a local process which is responsible for submitting the jobs. That local process will not exit until the whole application has finished. Users have to keep eyes on the local process in case it is killed due to connection lost, session timeout, local operating system problem, etc.

To solve the problem, we would like to introduce the Flink Driver. Users can submit applications using driver mode. A Flink driver job will be submitted to take care of the job submissions in the user application.

The driver mode itself is not necessarily bound to interactive programming. But because most applications using interactive programming are long running, driver mode is particularly useful in that case.

Goals

The goals of driver mode:

  1. Execute user applications in Flink Driver (a Flink job running main function of user program), so that users need not to keep a long-running local process.
  2. Support all application submission approach Flink currently supports, on standalone cluster or YARN / Kubernetes cluster, etc.
  3. Application status can be queried when the application runs in driver mode.

Public Interfaces

All the plublic interface changes are on ./flink run commandline.

  • Add a new option -D/--driver to enable driver mode
  • Add the following new configurations for driver jobs if driver mode is used

-Dhm

--driverheapmemory <arg>

Driver task heap memory in MB

-Ddm

--driverdirectmemory <arg>

Driver task direct memory in MB

-Dc

--drivercpus <arg>

Driver task cpu cores.

-Dnm

--drivernativememory <arg>

Driver task native memory in MB


Proposed Changes

Current status of running applications

Flink provides a Command-Line Interface(CLI) to run applications packaged as jar files. There are two cases, per-job and session mode, depending on whether user provides an existing Flink cluster.

per-job

.bin//flink run application.jar

  • For each job in the application, a Flink cluster is deployed with the job graph. The Flink cluster will run the job graph as soon as the deployment completes. After the job finishes, the cluster will be destroyed and recycled.
  • Per-job mode only works if there is only one JobGraph in the user application.


session-mode

.bin//flink run application.jar -m JM_ADDRESS

  • User retrieves the cluster client with the configured job manager host:port or cluster id, then submits jobs to that cluster using the ClusterClient. The Flink cluster is left untouched after the jobs finish.


Users can also specify the option of attached / detached mode when run a Flink application via command line. If a user application only contains a single job, the behavior is the following:

  • By default, attached mode is used. The command line process will wait until the job in the user application finishes before it exits.
  • If -d (detached) is specified as the command line argument, the command line process will exit once the job in the user application has been submitted, but not wait for it to finish.


If there are multiple jobs in the user application, the behavior of attached / detached is not really well defined and could be sometimes confusing. Furthermore, Users can also specify a savepoint path to restore the job by configuring the option -s. More options and their usages can be found in the appendix.

Run applications with driver mode

Similar to the current status, driver mode is only available when users run their applications via command line. We would like to introduce a new option -D/--driver to ./bin/flink run.

Take yarn mode for instance, users can execute the following command to submit applications enabling flink driver:

bin/flink run -m yarn-cluster -D -c {ENTRY_CLASS} {PATH_TO_APPLICATION_JAR}

Driver mode behavior

Because applications with driver mode may submit multiple jobs, the current per-job mode is no longer supported. Therefore, a Flink cluster is required to run the jobs in an application.

When a user application runs in driver mode, a flink driver job will first be created to host the main function of the user application. The job will then be submitted to the given Flink cluster as an ordinary Flink job with parallelism set to one. That driver job will then invoke the user main function and submit jobs in user application to the same Flink cluster that runs the driver job itself.

Here’s the whole picture of the Flink Driver:

Since all jobs are running in the same cluster, users can use REST API to query the job status and get their application progress. The job name of a job that was submitted by the driver job will have a prefix to the user defined job name.

Work with command line options

Job Configuration in Driver Mode

With driver mode, All option values in bin/flink run command line can be classified into the following categories:

  • Flink cluster configurations: User might want to specify the yarn cluster properties, such as queuing and application name/id  and resources for containers. etc.
  • Flink driver job configurations:  Driver job may need a separate set of resource configurations. For example, a driver job may need more memory because it collects a large table into that process. This means ideally we would like to run driver in a task manager who has a different size from other task managers. One possible solution is to allow AM to start task managers on demand. So the task managers only start when a job is submitted. For wide variety of applications’ need, it is necessary to configure driver task’s resource consumption(heap memory, cpu cores,  direct memory, etc).
  • User job configurations: It will be in the same way we configure oridinary jobs.

See appendix for details.

Work with -m option

Users may or may not provide a Flink cluster when invoking ./flink run.

  • If a Flink cluster is provided (-m option is specified), the command line process will simply submit the driver job to that cluster. When driver job and all the application jobs finish running, the Flink cluster is left untouched.
  • If no Flink cluster is provided, the command line process will first start a Flink session cluster, then submit the driver job to that cluster and exit. After the driver job and all application jobs finish running, the Flink cluster will also be destroyed.

Work with -d option

Users cannot use -d and -D option at the same time. Otherwise an exception will be thrown.

So after introducing driver mode, there are following possible cases:

  1. WIthout -d or -D, no behavior change.
  2. With -d, no behavior change.
  3. with -D, driver mode. The driver job always block waiting for the main function of the user application to finish.

The current behavior has a drawback that the driver job will not exit if the application is a stream application, i.e. the jobs submitted by the driver job is long running. This means that users needs to know whether an application is a long running application or not. It is not a regression because users needs to know that as well when use -d option.

However, this caveat does hurt user experience. In long run, we would like to fix it. A possible solution is the following:

  1. The client / flink driver always wait until the main function in user application to finish.
  2. User applications can choose to run a job in blocking / non-blocking way.
    1. execute() submits the job and blocks until it finishes.
    2. submit() submits the job and returns after the job has been submitted.

Because this is an orthogonal issue to driver, and the solution may involve API changes. We will address this issue in a separate discussion.     

Some design details

Why run Flink driver as a Flink job?

Another option to run Flink driver is to make it a module running in ApplicationMaster (Dispatcher).

We choose to launch Flink Driver as a simple lightweight flink job due to the following considerations:

  • Set cluster manager free from resource management for driver.
    • Flink driver is responsible for executing user applications whose resource consumption varies from app to app. If a driver starts as a process in cluster manager running an application which requires large amount of memory and cpu resources, the cluster manager will suffer from large single point pressure, not to say running multiple drivers at a same time.
    • When a driver runs in session mode, the AM (dispatcher) of the existing Flink cluster may not have enough resource to run the driver. However, driver as a Flink job is more flexible in that case, because TaskManagers usually have more abundant resources.
  • Leverage existing components and services to track the application execution status.
    • By starting driver as a simple lightweight flink job, there is no need to create a new set of services and RPC calls for driver to monitor and report application running status.

Why driver and user jobs run in the same Flink cluster?

For the sake of cluster management and resource management, the driver job must be run with user jobs in the same cluster. If driver and user job run in separate cluster, it will be a waste of time and resource to create two different clusters and increase the difficulties of cluster lifecycle management and exception handling.

How to manage cluster lifecycle in driver mode?

In driver mode, the lifecycle of the Flink cluster depends on the context. If the Flink cluster was provided by the user via -m option, the cluster should not be released after the application exits. On the other hand, if the Flink cluster was created automatically to run the application, it should be teared down after the application exits.

There are a few implementation challenges to enforce this behavior. The following are some solutions we are considering.

  • How to distinguish between user-provided session and command-line created session.

the CLI client will get the -m option value, if the “host:port” or applicationId is presented, it is in user-provided session mode.  The CLI client will directly retrieve the provided cluster information and submit or query jobs via it. Otherwise, command-line created session is used.

  • How to destroy the command-line created session after application finishes.

Dispatcher is responsible for dispatching jobs in flink cluster. And the cluster will exit once dispatcher is terminated. So the following changes could be made to solve the problem:

  1. Introduce a new dispatcher named “DriverDispatcher”, it’s a subclass of Dispatcher responsible for receiving job submission from client, starting JobMasters for jobs and maintaining job information. Most functionalities of driverDispathcer is the same as existing dispatchers, the main difference is the driverDispatcher will monitor every job’s completion. Once there is no pending or running jobs, the driverDispatcher will reach a termination state, then the running cluster will be closed and recycled.
  2. When submit a job to DriverDispatcher, a living priority will be assigned to the job. We let the driver job has the highest priority, while common user jobs have normal priority and other daemon jobs have the lowest priority. If the job with the highest priority FAILED, driverDispatcher will stop all running jobs and exit.
  • How to manage the lifecycle of DriverDispather.

Job cluster entry point will start cluster components and driverDispatcher. As mentioned above, the driver dispatcher will listen to jobs’ termination. Once there is no pending or running jobs, the driverDispatcher will reach a termination state and exit.

Exception handling in Driver Mode

If the driver job failed exceptionally, the driverDispatcher will also exit and the whole application finishes execution marked as failed. DriverDispatcher maintains jobIds and their living priority when receiving job submission. Everytime the dispatcher gets a job termination callback,  it will fetch the job living priority and termination status, if it is the highest priority and FAILED state, it means the driver has failed exceptionally, the dispatch will exit and stop all running jobs, finish the whole application.

Future works

Elastic session adaptation

When there are multiple jobs in an application, the same Flink cluster will run all these jobs. Because the jobs may have different resource footprint, that Flink cluster has to be large enough to run the largest jobs in the application. This wastes some resources. In the future, when the Flink cluster resource consumption is more elastic, Flink driver can also benefit from it.

Deprecation of -d/--detach option in ./flink run

The current behavior of -d/--detach option falls in short when there are multiple jobs in an application. In the future we probably want to deprecate it and let the application logic itself to decide whether it should block until the jobs finish or not.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

  • Run Flink driver is to make it a module running in ApplicationMaster (Dispatcher).
  • Start a dedicated container or k8s pod to run Flink Driver.

Appendixes

Options for Driver

-Dhm

--driverheapmemory <arg>

Driver task heap memory in MB

-Ddm

--driverdirectmemory <arg>

Driver task direct memory in MB

-Dc

--drivercpus <arg>

Driver task cpu cores.

-Dnm

--drivernativememory <arg>

Driver task native memory in MB


Common Options

-b

--disableSubmitOptimization

If present, the jobgraph/userjars/libjars/files will                                     be transferred by blob server.Otherwise, they will be transferred by distributed cache in Yarn or image in Kubernetes for optimization.

-m

--jobmanager <arg>

Address of the jobManager(Master) to which to connect.

-c

--class <classname>

Class with the program entry point.

-C

--classpath <url>

Adds a URL to each user code classloader on all nodes in the cluster.

-d

--detached

If present, runs the job in detached mode.

-D

--driver

if present, runs the job in driver mode.


--file <files>

Attach custom files for job.


--libjars <libraryJars>

Attach custom library jars for job.


--mod <mod>

run python library module as a script.

-n

--allowNonRestoredState

Allow to skip savepoint state that cannot be restored.

-r

-resume <resumePath>

Path to a checkpoint directory to restore the job from latest externalized checkpoint.

-p

--parallelism <parallelism>

The parallelism with which to run the program.

-s

--fromSavepoint <savepointPath>

Path to a savepoint to restore the job from.


Options for yarn

-yid

--yarnapplicationId <arg>

Attach to running YARN session CLI.

-yj

--yarnjar <arg>

Path to Flink jar file.

-yjm

--yarnjobManagerMemory <arg>

Memory for JobManager Container [in MB]

-ytm

--yarntaskManagerMemory <arg>

Memory per TaskManaer Container [in MB]

-yn

--yarncntainer <arg>

Number of YARN container to allocate.

-ynm

--yarnname <arg>

set a custom name for the application on YARN.

-yqu

--yarnquery

Display available YARN resources(memory, core)

-ys

--yarnslots <arg>

Number of slots per TaskManager.

-yD

<property=value>

use value for given property. Dynamic properties.

-ysl

--yarnsharedLib <path>


-st

--yarnstreaming


-yt

--yarnship


-yta

--yarshipArchives


-yz

--yarnzookeeperNamespace

Namespace to create the Zookeeper sub-paths for high availability mode.

Note: -yD is used for setting dynamic properties which might be set to configure cluster or user job. Therefore, all of them will finally add up to the cluster configuration and  the flink configuration when deploying cluster or submitting job.

Options for kubernetes

-ki

--kubernetesimage <arg>

Container image to use for Flink containers.

-kjm

--kubernetesjobManagerMemory <arg>

Memory for JobManager Container [in MB].

-kms

--kubernetesmaster <arg>

Kubernets cluster master url.

-kn

--kubernetespods <arg>

Number of kubernets pods to allocate.

-kns

--kubernetsnamespace <arg>

Specify kubernets namespace.

-ks

--kubernetsslots <arg>

Number of slots per TaskManager.

-ksa

--kubernetsserviceaddress <arg>

The exposed addresss of kubernets service to submit job and view dashboard.

-ktm

--kubernetstaskManagerMemory <arg>

Memory per TaskManager Container [in MB].

-kD

<property=value>

use value for Dynamic properties.