You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

 

Status

Current stateUnder Discussion

Discussion thread: - 

JIRA: -

Released: -

 

This Flink Improvement Proposal outlines improvements in how Flink is used on top of various cluster managers (YARN, Mesos) and deployment technologies (Docker, Kubernetes, etc).

For simplicity, we refer to all those cluster- and deployment frameworks as cluster managers.


 

Original design draft document: https://docs.google.com/document/d/1zwBP3LKnJ0LI1R4-9hLXBVjOXAkQS5jKrGYYoPz-xRk

(may still be used for copying and adapting figures)



Motivation

This FLIP addresses a series of shortcomings in the current integration with YARN and container managers (Docker/Kubernetes) and future operations with Mesos.

Shortcomings to address

  • Resource allocation is currently static. A job needs to grab all required resources up front and maintain that static set of resources throughout the execution. The improvement will allow for dynamic resource acquisition and dynamic release of resources on YARN and Mesos for better resource utilization and dynamic scaling

  • On YARN, all allocated containers are currently of the same size. The improvement will allow to allocate different container sizes (memory / CPU cores) for different operators.

  • Interaction with container management infrastructures like Docker/Kubernetes is clumsy, because Flink Jobs are deployed in two steps: (1) Start the Framework (2) submit the job. With these improvement, jobs can be dockerized and deployed in a natural way as part of the container deployment, step 2 will no longer be necessary

  • Currently, the JobManager accepts multiple concurrent jobs, but has no notion of “fair-”, “capacity-based-”, or “priority-based-” scheduling. In this proposal, we delegate all notion of cross-job resource prioritization to frameworks like YARN and Mesos, which already implement that.

  • The web dashboard disappears after the job finished in Flink on YARN

  • The concept of “sessions” (clusters that accept multiple jobs) with “per job clusters” is intermingled and hard to understand. This proposal separates them more clearly


Core Changes


Single Job JobManager

The most important change is that the JobManager handles only a single job. The JobManager will be created with a JobGraph and will be destroyed after the job execution is finished. This model more naturally maps what happens with most jobs anyways.

Cross-job functionality is handled by other components that wrap and create JobManagers. This leads to a better separation of concerns, and a more modular composability for various cluster managers.

The JobManager constructor will also optionally take a Savepoint or Checkpoint to initialize the job from.

ResourceManager

The ResourceManager (introduced in Flink 1.1) is the cluster-manager-specific component. There is a generic base class, and specific implementations for:

  • YARN

  • Mesos

  • Standalone-multi-job (Standalone mode)

  • Self-contained-single-job (Docker/Kubernetes)


The main tasks of the ResourceManager are

  • Acquire new TaskManager (or slots) by starting containers, or allocating them to a job

  • Giving failure notifications to JobManagers and TaskManagers

  • Caching TaskManagers (containers) to be reused, releasing TaskManagers (containers) that are unused for a certain period.


The ResourceManager may or may not be task slot aware (probably will be). The difference in implementation of the ResourceManager is minimal - A slot aware ResourceManager maintains a map of available TaskManager slots, rather than a map of available TaskManagers.

For the sake of simplicity, the following talks about “slots”, but one can think simply of “TaskManager” instead, for the common case of a one-slot TaskManager.


The following are core aspects of the ResourceManager design:

  • The ResourceManager no longer has a resource pool size, but receives individual requests for slots. That way, jobs can request TaskManagers of different resources (Memory/CPU).

  • The ResourceManager lives across jobs and JobManagers. That enables the use of sessions and the Standalone Mode.

    • Consequently, the ResourceManager is the first point of contact for TaskManagers and handle the TaskManager registration

    • To have a unified way of handling container-caching across Standalone Mode, for batch jobs with varying resource requirements, and for sessions, the ResourceManager keeps the pool of available TaskManagers and their slots.

  • The ResourceManager must be able to fail without interfering with the execution of current jobs. Running jobs can still continue executing and use the slots they have allocated across task failures and retries. They will only not be able to acquire new slots while the ResourceManager is down.

    • The ResourceManager may actually go down in Yarn / Mesos during maintenance. This should not stop the streaming programs.

    • While JobManagers and TaskManagers may be redundant and have a seamless failover, the ResourceManager may always be unable to provide containers or slots for certain times, as a result of the Yarn / Mesos architecture.

  • ResourceManager fault tolerance should work without persistent state in general

    • All that the ResourceManager does is negotiate between the cluster-manager, the JobManager, and the TaskManagers. Its state can hence be reconstructed from re-acquiring containers and re-registration from JobManagers and TaskManagers

    • Note that certain specialization (for example for Mesos or Yarn) may still persist cluster-manager-specific state, if that is required.

  • JobManager may register at the ResourceManager. A registered JobManager gets notifications about TaskManager failures for the slots it allocated from that TaskManager.





In the case where the ResourceManager already has slots available from registered TaskManagers, steps (2) and (3) are skipped.


TaskManager

TaskManagers are both in contact with the ResourceManager and JobManager:

It talks to the ResourceManager to advertise their slot resources, and to the JobManager to execute tasks in the slots allocated to that job. It needs to heartbeat both managers to monitor their liveliness and detect failures.


ResourceManager interaction

  • The TaskManager initially registers at the ResourceManager. A disconnect from the ResourceManager simply results in re-tying to register and advertise the currently available slots.

  • With each heartbeat, the TaskManager also transmits its slot availability. That way, the ResourceManager will always know about available slots. In addition, direct notifications about slots becoming available updates the ResourceManager’s view faster.

  • The TaskManager’s view about which slot is taken (and by which JobManager) is the ground truth - the ResourceManager derives its view from the TaskManager notifications and heartbeats.

  • The ResourceManager may tell the TaskManager to give a slot to a specific JobManager, and the TaskManager will offer that slot to the JobManager. If not accepted, the TaskManager notifies the ResourceManager that the slot is in fact available.

  • The ResouceManager may tell the TaskManager to shut down (exit the process)


JobManager interaction

  • The TaskManager offers a slot to a JobManager at the ResourceManager’s behest. That slot is then tied to that JobManager until the JobManager releases the slot.

  • The TaskManager watches all JobManagers to which it has offered slots. Loss of connection to the JobManager results in triggering master-failure recovery (currently: cancel all tasks form that master)

  • JobManagers can deploy tasks only into slots they allocated.

  • Upon loss of connection to the JobManager, the TaskManager will try to re-register the slots at the new JobManager for that job (retrieved via the HA leader lookup). After a moderate timeout period, it releases the slots and makes them available again. If a backup JobManager does not take over within that period, it will have to re-request the slots from the ResourceManager.


JobManager Slot Pool

The JobManager has a SlotPool which holds the slots that were offered to it and accepted. The JobManager’s scheduler grabs slots from the SlotPool and can thus access all currently registered slots even if the ResourceManager is down.

The SlotPool is a modification of what is currently the InstanceManager.

The SlotPool will attempt to acquire new slots from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available, or it gets a decline from the ResourceManager, or a request times out, it fails the slot request.

The SlotPool releases slots that are unused to the ResourceManager. Slots count as unused if they are not used when the job is fully running (fully recovered).

Dispatcher

The new design includes the concept of a Dispatcher. The dispatcher accepts job submissions from clients and starts the jobs on their behalf on a cluster manager.

The dispatcher is introduced because:

  • Some cluster managers need a central job spawning and monitoring instance

  • It subsumes the role of the standalone JobManager, waiting for jobs to be submitted

In some setups, the dispatcher is optional (YARN) or not applicable (Kubernetes).




In the future run, the dispatcher will also help with the following aspects:

  • The dispatcher is a cross-job service that can run a long-lived web dashboard

  • Future versions of the dispatcher should receive only HTTP calls and thus can act as a bridge in firewalled clusters

  • The dispatcher never executes code and can thus be viewed as a trusted process. It can run with higher privileges (superuser credentials) and spawn jobs on behalf of other users (acquiring their authentication tokens). Building on that, the dispatcher can manage user authentications

Fault Tolerance

The core recovery mechanism is still task restart and state restore from checkpoint.

The following aspects of fault tolerance are specific to the individual cluster managers and described in each section:

  • Detection and restart of the process(es) that execute the JobManager and ResourceManager

  • Recovery of the job’s JobGraph and libraries

 



Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings
  • Exposed monitoring information


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

(none yet)

  • No labels