Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Add a new option -D/--driver to enable driver mode
  • Add the following new configurations for driver jobs if driver mode is used

-Dhm

--driverheapmemory <arg>

Driver task heap memory in MB

-Ddm

--driverdirectmemory <arg>

Driver task direct memory in MB

-Dc

--drivercpus <arg>

Driver task cpu cores.

-Dnm

--drivernativememory <arg>

Driver task native memory in MB


Proposed Changes

Current status of running applications

...

  • Flink cluster configurations: User might want to specify the yarn cluster properties, such as queuing and application name/id  and resources for containers. etc.
  • Flink driver job configurations:  Driver job may need a separate set of resource configurations. For example, a driver job may need more memory because it collects a large table into that process. This means ideally we would like to run driver in a task manager who has a different size from other task managers. One possible solution is to allow AM to start task managers on demand. So the task managers only start when a job is submitted. For wide variety of applications’ need, it is necessary to configure driver task’s resource consumption(heap memory, cpu cores,  direct memory, etc).
  • User job configurations: It will be in the same way we configure oridinary jobs.

...

  • How to distinguish between user-provided session and command-line created session.

the CLI client will get the -m option value, if the “host:port” or applicationId is presented, it is in user-provided session mode.  The CLI client will directly retrieve the provided cluster information and submit or query jobs via it. Otherwise, command-line created session is used.

  • How to destroy the command-line created session after application finishes.

Dispatcher is responsible for dispatching jobs in flink cluster. And the cluster will exit once dispatcher is terminated. So the following changes could be made to solve the problem:

  1. Introduce a new dispatcher named “DriverDispatcher”, it’s a subclass of Dispatcher responsible for receiving job submission from client, starting JobMasters for jobs and maintaining job information. Most functionalities of driverDispathcer is the same as existing dispatchers, the main difference is the driverDispatcher will monitor every job’s completion. Once there is no pending or running jobs, the driverDispatcher will reach a termination state, then the running cluster will be closed and recycled.
  2. When submit a job to DriverDispatcher, a living priority will be assigned to the job. We let the driver job has the highest priority, while common user jobs have normal priority and other daemon jobs have the lowest priority. If the job with the highest priority FAILED, driverDispatcher will stop all running jobs and exit.
  • How to manage the lifecycle of DriverDispather.

Job cluster entry point will start cluster components and driverDispatcher. As mentioned above, the driver dispatcher will listen to jobs’ termination. Once there is no pending or running jobs, the driverDispatcher will reach a termination state and exit.

Exception handling in Driver Mode

...

Appendixes

Options for Driver

-Dhm

--driverheapmemory <arg>

Driver task heap memory in MB

-Ddm

--driverdirectmemory <arg>

Driver task direct memory in MB

-Dc

--drivercpus <arg>

Driver task cpu cores.

-Dnm

--drivernativememory <arg>

Driver task native memory in MB


Common Options

-b

--disableSubmitOptimization

If present, the jobgraph/userjars/libjars/files will                                     be transferred by blob server.Otherwise, they will be transferred by distributed cache in Yarn or image in Kubernetes for optimization.

-m

--jobmanager <arg>

Address of the jobManager(Master) to which to connect.

-c

--class <classname>

Class with the program entry point.

-C

--classpath <url>

Adds a URL to each user code classloader on all nodes in the cluster.

-d

--detached

If present, runs the job in detached mode.

-D

--driver

if present, runs the job in driver mode.


--file <files>

Attach custom files for job.


--libjars <libraryJars>

Attach custom library jars for job.


--mod <mod>

run python library module as a script.

-n

--allowNonRestoredState

Allow to skip savepoint state that cannot be restored.

-r

-resume <resumePath>

Path to a checkpoint directory to restore the job from latest externalized checkpoint.

-p

--parallelism <parallelism>

The parallelism with which to run the program.

-s

--fromSavepoint <savepointPath>

Path to a savepoint to restore the job from.


Options for yarn

-yid

--yarnapplicationId <arg>

Attach to running YARN session CLI.

-yj

--yarnjar <arg>

Path to Flink jar file.

-yjm

--yarnjobManagerMemory <arg>

Memory for JobManager Container [in MB]

-ytm

--yarntaskManagerMemory <arg>

Memory per TaskManaer Container [in MB]

-yn

--yarncntainer <arg>

Number of YARN container to allocate.

-ynm

--yarnname <arg>

set a custom name for the application on YARN.

-yqu

--yarnquery

Display available YARN resources(memory, core)

-ys

--yarnslots <arg>

Number of slots per TaskManager.

-yD

<property=value>

use value for given property. Dynamic properties.

-ysl

--yarnsharedLib <path>


-st

--yarnstreaming


-yt

--yarnship


-yta

--yarshipArchives


-yz

--yarnzookeeperNamespace

Namespace to create the Zookeeper sub-paths for high availability mode.

Note: -yD is used for setting dynamic properties which might be set to configure cluster or user job. Therefore, all of them will finally add up to the cluster configuration and  the flink configuration when deploying cluster or submitting job.

Options for kubernetes

-ki

--kubernetesimage <arg>

Container image to use for Flink containers.

-kjm

--kubernetesjobManagerMemory <arg>

Memory for JobManager Container [in MB].

-kms

--kubernetesmaster <arg>

Kubernets cluster master url.

-kn

--kubernetespods <arg>

Number of kubernets pods to allocate.

-kns

--kubernetsnamespace <arg>

Specify kubernets namespace.

-ks

--kubernetsslots <arg>

Number of slots per TaskManager.

-ksa

--kubernetsserviceaddress <arg>

The exposed addresss of kubernets service to submit job and view dashboard.

-ktm

--kubernetstaskManagerMemory <arg>

Memory per TaskManager Container [in MB].

-kD

<property=value>

use value for Dynamic properties.