Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: "Under Discussion"

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: Under Discussion

Released: TODO


Motivation


Flink uses a JobManager process model with multiple TaskManagers. The JobManager process includes the Dispatcher, ResourceManager, and RestEndpoint core components. The components within the JobManager are responsible for different jobs. Their responsibilities are as follows

...

This proposal proposes a splitting scheme for the current process and a new process implementation idea that is compatible with the original process model: splitting the internal JobMaster component of the JobManager, and controlling whether to enable this new process through a parameter In the split scheme, when the user configures, the JobMaster will make it run as an independent service, reducing the workload of the JobManager. By implementing a new Dispatcher to communicate and interact with a single split JobMaster or multiple JobMasters, to achieve job management


Public Interfaces


  • Add a new configuration option 'jobmanager.jobmaster.single.enable' for enabling this feature.

  • Add a new bash script for manually starting JobMaster processes.

  • Add a new configration file 'JOB_MASTERS' for specifying JobMaster processes to be started when using 'start-cluster.sh' 

Proposed Changes


The current runtime of Flink consists of two types of processes: a JobManager and one or more TaskManagers. The RestEndpoint in the JobManager is responsible for receiving commands such as user client submission, cancellation, and Savepoint operations, which will be forwarded to the Dispatcher for processing.

...

    • BlobSharedClient: A client component that provides file writing or file downloading, and is used to download JobGraph from BlobServer (a component that maintains storage by one person in JobManager). Since the BlobServer will not be registered in the JobMasterContainerRunner, we need to encapsulate a Client to go to the BlobServer in the JobManager to download the required job file or JobGraph

JobMasterDispatcher

We need to improve based on the existing Dispatcher to meet the request of single/multi-node JobMasterContainerRunner receiving jobs, so we need to implement a new one named JobMasterDispatcher for job request distribution

JobMasterPrimaryDispatcher

The JobMasterPrimaryDispatch runs on the JobManager, which replaces the blame and work of the original Dispatcher, and is responsible for forwarding the relevant job requests received by the user to one or more JobMasterSecondaryDispatch to trigger the corresponding operations. JobMasterSecondaryDispatch will establish a heartbeat and registration link with it

JobMasterSecondaryDispatcher

JobMasterSecondaryDispatch must implement the DispatcherGateway and HeartBeat interfaces, and implementing these interfaces will change the existing Dispatcher behavior. It should be noted that its behavior includes the following

  • When the user enables separation, the OnMainThreadJobManagerRunnerRegistry in the Dispatcher is no longer applicable. The DefaultJobManagerRunnerRegistry maintains multiple JobManagerRunners according to the jobId. This part should be maintained in each JobMaster independent thread, instead of maintaining each Runner thread in the Dispatcher. The JobMasterDispatcher is only responsible for maintaining the basic job information reported by each independent JobMaster, not the worker threads of each JobManagerRunner
  • Job submission, cancellation, tracking, listing, etc. are no longer done at the thread level, but forward these requests through JobMasterPrimaryDispatch, hand them over to JobMasterPrimaryDispatcher for collection and collection, and then expose comprehensive information to users through RestApi
  • JobManagerRunnerTerminationFutures is used as the end worker thread of each job. This part needs to be migrated to JobMasterEntryPoint to run from a global perspective of a component, and will not run in JobMasterDispatch later
  • ResourceCleaner is for resource cleaning logic, global cleaning will be cleaned in JobMasterDispatch, localResourceCleaner for each job will be cleaned locally in each JobMaster


JobMasterEntryPoint

As the entry class of the JobMaster, the entry class is responsible for starting a JobMaster process and registering with the JobManager like the Taskmanager. After the JobMaster is registered, the job can be submitted from the JobMasterDispatch

...

● When the user manually closes the JobMaster, all jobs in this JobMaster will be stopped safely


BlobSharedClient

BlobSharedClient is mainly used to download compiled JobGraph or dependencies through remote access to BlobServer inside JobMasterRunner, so BlobSharedClient will include BlobClient to remotely access BlobServer. BlobSharedClient

...

  • The JobMaster thread is no longer attached to the Dispatcher as before, but handed over this part of the work to the JobMasterContainerRunner through the newly implemented JobMasterDispatcher.
  • JobMasterContainerRunner requires a client to operate BlobServer, because this part was previously operated directly in JobManager


Maybe Next to do after finish Standalone Mode support

This content is not completed in the initial work phase, but for a completed FLIP, listing this part of the work is some problems that need to be improved in the future

  • WebUI as the work of exposing this component
  • Load balancing of each JobMasterRunner job
  • Implement this thread separation mode on Kubernetes and YARN

......


Compatibility, Deprecation, and Migration Plan


  • What impact (if any) will there be on existing users?

...

  • When will we remove the existing behavior?

None

Test Plan


  • These are some new unit tests, which will not affect the original functions or make changes to the original functions for the time being
  • Test whether the full life cycle process of a Flink job can be normally run based on the running status of a single JobMasterContainerRunner
  • Test the original process (the process that does not separate the process) to run the Flink job normally


Rejected Alternatives

None