Motivation


Flink uses a JobManager process model with multiple TaskManagers. The JobManager process includes the Dispatcher, ResourceManager, and RestEndpoint core components. The components within the JobManager are responsible for different jobs. Their responsibilities are as follows

  • Dispatcher: After receiving the user's JobGrpah, it parses it, and creates a thread-level JobManagerRunner for each job. The JobManagerRunner creates a JobMaster. After performing the ExecutionGraph, checkpoint coordination, Shuffle, and scheduling operations, the system sends a resource request to ResourceManager
  • RestEndpoint: receives jobs submitted by the client's JobGraph or WebUI user. A small Web server is maintained internally, and different handlers are used to distribute requests to the Dispatcher
  • ResourceManager: ResourceManager is responsible for resource allocation and management. For different environments and resource management platforms (such as Standalone deployment, or YARN or K8S), there are different specific implementations to allocate specific resources


At present, when the Dispatcher receives the job (JobGraph) submitted by the user through the client, it creates a JobMaster thread for each job through the Dispatcher and runs on the Dispatcher. The resources required by each topology go to the ResourceManager to apply for Slot resources and interact with the Taskmanager for communication. The overall process is shown in the figure below.




The process is as follows

① The user submits a job through the client (Maybe FlinkJavaClient Or WebUi Submit) to the RestEndpoint component of the JobManager

② After RestEndpoint receives the relevant job requests, it will submit these requests (such as submitting jobs) to the Dispatcher inside the JobManager

③ The Dispatcher receives a job-related request, and if it is a request to submit a job, a JobMaster is created for each Job

④ When the JobMaster receives the job graph, it will be converted into an ExecutionGraph and written to the BlobServer, and it will start to schedule according to the job graph, apply for resources from the ResourceManager and interact with the Taskmanager


This process model has some problems in some special scenarios. When running a large number of jobs in the SessionCluster cluster, there will be single-point availability and performance bottlenecks. For example, when a SessionCluster runs more than 100 jobs, the workload of the JobManager may be too high, there may be a single point of failure, and the recovery time will be too long when multiple jobs are restored. This is because each JobMaster in JobManage has a lot of responsibilities. When multiple jobs are running in the same JobManager, multiple JobMaster threads will be generated, which will result in high load due to component operation and long job recovery time. question.

This proposal proposes a splitting scheme for the current process and a new process implementation idea that is compatible with the original process model: splitting the internal JobMaster component of the JobManager, and controlling whether to enable this new process through a parameter In the split scheme, when the user configures, the JobMaster will make it run as an independent service, reducing the workload of the JobManager. By implementing a new Dispatcher to communicate and interact with a single split JobMaster or multiple JobMasters, to achieve job management


Public Interfaces


  • Add a new configuration option 'jobmanager.jobmaster.single.enable' for enabling this feature.

  • Add a new bash script for manually starting JobMaster processes.

  • Add a new configration file 'JOB_MASTERS' for specifying JobMaster processes to be started when using 'start-cluster.sh' 

Proposed Changes


The current runtime of Flink consists of two types of processes: a JobManager and one or more TaskManagers. The RestEndpoint in the JobManager is responsible for receiving commands such as user client submission, cancellation, and Savepoint operations, which will be forwarded to the Dispatcher for processing.

In this new function, the following new components have been added to support the separation of JobMaster threads to reduce the workload of the JobManager

  • JobMasterDispatcher: defines a process parameter based on JobMaster separation deployment. When this parameter is enabled, the default Dispatcher will not be used, but this new implementation will be used. This component will behave differently on the JobManager side and the JobMasterRunner side (although it still looks like a complete Dispatcher to the outside world), but they will behave differently on different components.
    • JobMasterPrimaryDispatcher: Part of the JobMasterDispatcher running on the JobManager, through this internal implementation, a series of tasks that need to be completed by the JobMaster, such as the user client to submit the job and trigger the Savepoint, will be forwarded
    • JobMasterSecondaryDispatcher: Part of the JobMasterDispatcher running on the JobMasterContainerRunner, through this internal implementation, the JobMasterPrimaryDispatcher will be sent to the request to receive and create a JobMaster thread corresponding to each job (similar to the logic that the Dispatcher handles these things now)
  • JobMasterEntryPoint: The entry class used to start the JobMasterContainerRunner, which instantiates the components required by the JobMasterContainerRunner
  • JobMasterContainerRunner: one or more such components, this process contains multiple JobMaster threads running, and internally runs components such as JobMasterSecondaryDispatcher, BlobSharedClient, HeartBeatServer, etc.
    • BlobSharedClient: A client component that provides file writing or file downloading, and is used to download JobGraph from BlobServer (a component that maintains storage by one person in JobManager). Since the BlobServer will not be registered in the JobMasterContainerRunner, we need to encapsulate a Client to go to the BlobServer in the JobManager to download the required job file or JobGraph

JobMasterDispatcher

We need to improve based on the existing Dispatcher to meet the request of single/multi-node JobMasterContainerRunner receiving jobs, so we need to implement a new one named JobMasterDispatcher for job request distribution

JobMasterPrimaryDispatcher

The JobMasterPrimaryDispatch runs on the JobManager, which replaces the blame and work of the original Dispatcher, and is responsible for forwarding the relevant job requests received by the user to one or more JobMasterSecondaryDispatch to trigger the corresponding operations. JobMasterSecondaryDispatch will establish a heartbeat and registration link with it

JobMasterSecondaryDispatcher

JobMasterSecondaryDispatch must implement the DispatcherGateway and HeartBeat interfaces, and implementing these interfaces will change the existing Dispatcher behavior. It should be noted that its behavior includes the following

  • When the user enables separation, the OnMainThreadJobManagerRunnerRegistry in the Dispatcher is no longer applicable. The DefaultJobManagerRunnerRegistry maintains multiple JobManagerRunners according to the jobId. This part should be maintained in each JobMaster independent thread, instead of maintaining each Runner thread in the Dispatcher. The JobMasterDispatcher is only responsible for maintaining the basic job information reported by each independent JobMaster, not the worker threads of each JobManagerRunner
  • Job submission, cancellation, tracking, listing, etc. are no longer done at the thread level, but forward these requests through JobMasterPrimaryDispatch, hand them over to JobMasterPrimaryDispatcher for collection and collection, and then expose comprehensive information to users through RestApi
  • JobManagerRunnerTerminationFutures is used as the end worker thread of each job. This part needs to be migrated to JobMasterEntryPoint to run from a global perspective of a component, and will not run in JobMasterDispatch later
  • ResourceCleaner is for resource cleaning logic, global cleaning will be cleaned in JobMasterDispatch, localResourceCleaner for each job will be cleaned locally in each JobMaster


JobMasterEntryPoint

As the entry class of the JobMaster, the entry class is responsible for starting a JobMaster process and registering with the JobManager like the Taskmanager. After the JobMaster is registered, the job can be submitted from the JobMasterDispatch

JobMasterEntryPoint will create each JobManagerRunner, HighAvailabilityServices, HeartbeatServies and other component services

When all internal components are ready, the JobMasterPrimaryDispatcher will be notified that it is ready, allowing users to submit tasks


● The user starts one or more independent processes of JobMaster through scripts. This startup process is the same as the startup process of TaskManager

● After the JobMaster is fully started, register this service to DispatcherGateway and HA-related places. Dispatcher reads the relevant information such as the communication address in HA, and learns the current available JobMaster-related information and workload, so that subsequent user job submissions can be made.

● When the user deploys the JobManager and enables the thread separation parameter of the JobManager, the job will submit the job to an independent JobMaster to run through the JobMasterDispatcher (provided that the JobMaster is registered to HA or the JobMasterDispatcher after the JobMaster is started and ready)

● After the job is completed (batch or stream processing), the JobMaster will release the JobManagerRunner it owns and report the relevant information to the JobMasterDispatcher

● When the user manually closes the JobMaster, all jobs in this JobMaster will be stopped safely


BlobSharedClient

BlobSharedClient is mainly used to download compiled JobGraph or dependencies through remote access to BlobServer inside JobMasterRunner, so BlobSharedClient will include BlobClient to remotely access BlobServer. BlobSharedClient

Implement BlobWriter, PermanentBlobService interface for compatibility with JobMaster writing related file operations


This independent component needs to implement basic functions such as HA and heartbeat. When the JobMaster independent component completes the entire startup process, this component is registered as a ready state, allowing users to implement it through a new Dispatcher: JobMasterDispatcher, through this JobMasterGateway running inside the JobManager Submit jobs to this remote JobMaster independent process, instead of running inside the JobManager.

When a user submits a job through the Flink Client or Web Ui, the default Dispatcher is no longer used for job submission, but another implementation is used: JobMasterDispatcher is used to submit the job, and it will be submitted to the stand-alone operation through RPC communication through the registered JobMasterGataway On the JobMaster process, when the JobMaster process receives the job submission request, it will perform job preparation, scheduling, etc. according to the previous operation mode. The subsequent workflow will not change much, but some of the original work in Dispatch will be performed. Threads may need to be migrated to run on a separate JobMaster, such as some job cleanup work, the end thread of job stop, etc. This part will be described in detail in the design of JobMasterEntryPoint

When the value of the user configuration (jobmanager.jobmaster.single.enable) is true, the JobManager process separation is enabled. After the process separation is enabled, the JobManager will use the JobMasterDispatcher to implement, run the JobMasterPrimaryDispatcher internally, and receive the job request submitted by the client and forward it to the registered On JobMasterContainerRunner. When it receives these requests, it will create multiple JobMaster threads on this process, and run it according to the original when it was not separated. The separated process model is as follows



After the thread separation parameter is enabled, the job submission process will look like this

  • ① The user submits a job through the client (Maybe Flink Java Client Or WebUi Submit)to RestEndpoint components from JobManager
  • ② After RestEndpoint receives related job requests, it will submit these requests (such as SubmitJob) to the JobMasterPrimaryDispatcher component of the internal JobManager
  • ③ The JobMasterPrimaryDispatcher receives the job request and forwards these requests to the JobMasterContainerRunner via RPC communication for each job submission and related requests (such as triggering Savepoints, etc.)
  • ④ The JobMasterSecondaryDispatcher inside the JobMasterContainerRunner receives the relevant job request from the JobManager. If the job is submitted, the JobMaster thread will be created as before, the job graph will be converted into an ExecutionGraph and written to the BlobServer, and it will start to schedule according to the job graph and apply to the ResourceManager resources and interacting with Taskmanager, all of which will not change later


By comparing with the original process, we can find the differences as follows:

  • The JobMaster thread is no longer attached to the Dispatcher as before, but handed over this part of the work to the JobMasterContainerRunner through the newly implemented JobMasterDispatcher.
  • JobMasterContainerRunner requires a client to operate BlobServer, because this part was previously operated directly in JobManager


Maybe Next to do after finish Standalone Mode support

This content is not completed in the initial work phase, but for a completed FLIP, listing this part of the work is some problems that need to be improved in the future

  • WebUI as the work of exposing this component
  • Load balancing of each JobMasterRunner job
  • Implement this thread separation mode on Kubernetes and YARN

......


Compatibility, Deprecation, and Migration Plan


  • What impact (if any) will there be on existing users?

This feature will not have any impact on the current, it will only be used when the user enables this feature, and the original method is used by default.

  • If we are changing behavior how will we phase out the older behavior?

The original method will not be removed temporarily

  • If we need special migration tools, describe them here.

If you need a new job to run on a separate JobMasterContainerRunner process, you only need to turn on the switch, which is transparent to the user

  • When will we remove the existing behavior?

None

Test Plan


  • These are some new unit tests, which will not affect the original functions or make changes to the original functions for the time being
  • Test whether the full life cycle process of a Flink job can be normally run based on the running status of a single JobMasterContainerRunner
  • Test the original process (the process that does not separate the process) to run the Flink job normally


Rejected Alternatives

None